From 5d816a4e56d9206c613de92725405137b5ce62bf Mon Sep 17 00:00:00 2001 From: Sean Whalen <44679+seanthegeek@users.noreply.github.com> Date: Tue, 28 Apr 2026 00:58:36 -0400 Subject: [PATCH] Offload mailbox layer to mailsuite>=2.0.0 (#741) mailsuite 2.0.0 extracted the IMAP, Microsoft Graph, Gmail, and Maildir connections out of parsedmarc into mailsuite.mailbox so other projects can reuse the same provider-agnostic interface. Replace the parsedmarc/mail submodules with a thin re-export of mailsuite.mailbox and drop the duplicated implementations. Per the migration note in seanthegeek/mailsuite#22, pass token_cache_name="parsedmarc" so existing AuthenticationRecord caches on disk continue to work without re-prompting users to authenticate. The existing graph_url config knob is forwarded unchanged. Drop direct dependencies that are now installed transitively via mailsuite[gmail,msgraph] (msgraph-core, imapclient, google-*). The extras are pulled in non-optionally so Gmail and Microsoft Graph support remain available out of the box. Drop nine test classes that were exercising mailsuite-side implementation internals (TestGmailConnection, TestGraphConnection, TestImapConnection, the _get_creds/_generate_credential half of TestGmailAuthModes, TestImapFallbacks, TestMSGraphFolderFallback, TestMaildirConnection, TestMaildirReportsFolder, TestMaildirUidHandling, TestTokenParentDirCreation); these are mailsuite's tests now. The CLI integration tests that mock parsedmarc.cli.{IMAP,Gmail,MSGraph}Connection are kept. Co-authored-by: Sean Whalen Co-authored-by: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 8 + parsedmarc/cli.py | 3 +- parsedmarc/constants.py | 2 +- parsedmarc/mail/__init__.py | 27 +- parsedmarc/mail/gmail.py | 196 ----- parsedmarc/mail/graph.py | 339 -------- parsedmarc/mail/imap.py | 117 --- parsedmarc/mail/mailbox_connection.py | 32 - parsedmarc/mail/maildir.py | 105 --- pyproject.toml | 9 +- tests.py | 1073 +------------------------ 11 files changed, 33 insertions(+), 1878 deletions(-) delete mode 100644 parsedmarc/mail/gmail.py delete mode 100644 parsedmarc/mail/graph.py delete mode 100644 parsedmarc/mail/imap.py delete mode 100644 parsedmarc/mail/mailbox_connection.py delete mode 100644 parsedmarc/mail/maildir.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 924fd5e..9e87070 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 9.11.0 + +### Changes + +- **Mailbox backends now live in `mailsuite>=2.0.0`.** The `IMAPConnection`, `MSGraphConnection`, `GmailConnection`, `MaildirConnection`, and `MailboxConnection` implementations were extracted into [mailsuite 2.0.0](https://github.com/seanthegeek/mailsuite/releases/tag/2.0.0) so other projects can reuse the same provider-agnostic interface. parsedmarc's `parsedmarc.mail` package is now a thin re-export of `mailsuite.mailbox`; existing imports (`from parsedmarc.mail import IMAPConnection`, etc.) continue to work unchanged. The CLI passes `token_cache_name="parsedmarc"` and forwards the existing `[msgraph] graph_url` config knob so cached `AuthenticationRecord`s and tokens carry over without re-prompting. +- **MSGraph backend rewritten on `msgraph-sdk` (kiota-based).** mailsuite 2.0 replaces the retired `msgraph-core==0.2.2` REST wrapper with the supported `msgraph-sdk` client. End-user behavior is unchanged. +- **Direct dependencies on `msgraph-core`, `imapclient`, `google-api-core`, `google-api-python-client`, `google-auth-httplib2`, `google-auth-oauthlib`, and `google-auth` removed.** They are now installed transitively via `mailsuite[gmail,msgraph]>=2.0.0`, which is included as a non-optional dependency so Gmail and Microsoft Graph support remain available out of the box. + ## 9.10.3 ### Fixed diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 25dccf8..17056d9 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -43,12 +43,12 @@ from parsedmarc import ( ) from parsedmarc.log import logger from parsedmarc.mail import ( + AuthMethod, GmailConnection, IMAPConnection, MaildirConnection, MSGraphConnection, ) -from parsedmarc.mail.graph import AuthMethod from parsedmarc.types import ParsingResults from parsedmarc.utils import ( InvalidIPinfoAPIKey, @@ -2150,6 +2150,7 @@ def _main(): token_file=opts.graph_token_file, allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage), graph_url=opts.graph_url, + token_cache_name="parsedmarc", ) except Exception: diff --git a/parsedmarc/constants.py b/parsedmarc/constants.py index c0a61b5..53011c9 100644 --- a/parsedmarc/constants.py +++ b/parsedmarc/constants.py @@ -1,4 +1,4 @@ -__version__ = "9.10.3" +__version__ = "9.11.0" USER_AGENT = f"parsedmarc/{__version__}" diff --git a/parsedmarc/mail/__init__.py b/parsedmarc/mail/__init__.py index 79939cc..24835ab 100644 --- a/parsedmarc/mail/__init__.py +++ b/parsedmarc/mail/__init__.py @@ -1,13 +1,26 @@ -from parsedmarc.mail.mailbox_connection import MailboxConnection -from parsedmarc.mail.graph import MSGraphConnection -from parsedmarc.mail.gmail import GmailConnection -from parsedmarc.mail.imap import IMAPConnection -from parsedmarc.mail.maildir import MaildirConnection +# -*- coding: utf-8 -*- + +"""Mailbox connections for parsedmarc. + +The implementations live in :mod:`mailsuite.mailbox` (extracted from +parsedmarc in mailsuite 2.0.0). This module re-exports them so +``parsedmarc.mail`` remains a stable import path for downstream consumers. +""" + +from mailsuite.mailbox import ( + GmailConnection, + IMAPConnection, + MailboxConnection, + MaildirConnection, + MSGraphConnection, +) +from mailsuite.mailbox.graph import AuthMethod __all__ = [ - "MailboxConnection", - "MSGraphConnection", + "AuthMethod", "GmailConnection", "IMAPConnection", + "MailboxConnection", "MaildirConnection", + "MSGraphConnection", ] diff --git a/parsedmarc/mail/gmail.py b/parsedmarc/mail/gmail.py deleted file mode 100644 index fa3af38..0000000 --- a/parsedmarc/mail/gmail.py +++ /dev/null @@ -1,196 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import annotations - -from base64 import urlsafe_b64decode -from functools import lru_cache -from pathlib import Path -from time import sleep -from typing import List - -from google.auth.transport.requests import Request -from google.oauth2.credentials import Credentials -from google.oauth2 import service_account -from google_auth_oauthlib.flow import InstalledAppFlow -from googleapiclient.discovery import build -from googleapiclient.errors import HttpError - -from parsedmarc.log import logger -from parsedmarc.mail.mailbox_connection import MailboxConnection - - -def _get_creds( - token_file, - credentials_file, - scopes, - oauth2_port, - auth_mode="installed_app", - service_account_user=None, -): - normalized_auth_mode = (auth_mode or "installed_app").strip().lower() - if normalized_auth_mode == "service_account": - creds = service_account.Credentials.from_service_account_file( - credentials_file, - scopes=scopes, - ) - if service_account_user: - creds = creds.with_subject(service_account_user) - return creds - if normalized_auth_mode != "installed_app": - raise ValueError( - f"Unsupported Gmail auth_mode '{auth_mode}'. " - "Expected 'installed_app' or 'service_account'." - ) - - creds = None - - if Path(token_file).exists(): - creds = Credentials.from_authorized_user_file(token_file, scopes) - - # If there are no (valid) credentials available, let the user log in. - if not creds or not creds.valid: - if creds and creds.expired and creds.refresh_token: - creds.refresh(Request()) - else: - flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes) - creds = flow.run_local_server(open_browser=False, oauth2_port=oauth2_port) - # Save the credentials for the next run - Path(token_file).parent.mkdir(parents=True, exist_ok=True) - with Path(token_file).open("w") as token: - token.write(creds.to_json()) - return creds - - -class GmailConnection(MailboxConnection): - def __init__( - self, - token_file: str, - credentials_file: str, - scopes: List[str], - include_spam_trash: bool, - reports_folder: str, - oauth2_port: int, - paginate_messages: bool, - auth_mode: str = "installed_app", - service_account_user: str | None = None, - ): - creds = _get_creds( - token_file, - credentials_file, - scopes, - oauth2_port, - auth_mode=auth_mode, - service_account_user=service_account_user, - ) - self.service = build("gmail", "v1", credentials=creds) - self.include_spam_trash = include_spam_trash - self.reports_label_id = self._find_label_id_for_label(reports_folder) - self.paginate_messages = paginate_messages - - def create_folder(self, folder_name: str): - # Gmail doesn't support the name Archive - if folder_name == "Archive": - return - - logger.debug(f"Creating label {folder_name}") - request_body = {"name": folder_name, "messageListVisibility": "show"} - try: - self.service.users().labels().create( - userId="me", body=request_body - ).execute() - except HttpError as e: - if e.status_code == 409: - logger.debug(f"Folder {folder_name} already exists, skipping creation") - else: - raise e - - def _fetch_all_message_ids(self, reports_label_id, page_token=None, since=None): - if since: - results = ( - self.service.users() - .messages() - .list( - userId="me", - includeSpamTrash=self.include_spam_trash, - labelIds=[reports_label_id], - pageToken=page_token, - q=f"after:{since}", - ) - .execute() - ) - else: - results = ( - self.service.users() - .messages() - .list( - userId="me", - includeSpamTrash=self.include_spam_trash, - labelIds=[reports_label_id], - pageToken=page_token, - ) - .execute() - ) - messages = results.get("messages", []) - for message in messages: - yield message["id"] - - if "nextPageToken" in results and self.paginate_messages: - yield from self._fetch_all_message_ids( - reports_label_id, results["nextPageToken"] - ) - - def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]: - reports_label_id = self._find_label_id_for_label(reports_folder) - since = kwargs.get("since") - if since: - return [ - id for id in self._fetch_all_message_ids(reports_label_id, since=since) - ] - else: - return [id for id in self._fetch_all_message_ids(reports_label_id)] - - def fetch_message(self, message_id) -> str: - msg = ( - self.service.users() - .messages() - .get(userId="me", id=message_id, format="raw") - .execute() - ) - return urlsafe_b64decode(msg["raw"]).decode(errors="replace") - - def delete_message(self, message_id: str): - self.service.users().messages().delete(userId="me", id=message_id).execute() - - def move_message(self, message_id: str, folder_name: str): - label_id = self._find_label_id_for_label(folder_name) - logger.debug(f"Moving message UID {message_id} to {folder_name}") - request_body = { - "addLabelIds": [label_id], - "removeLabelIds": [self.reports_label_id], - } - self.service.users().messages().modify( - userId="me", id=message_id, body=request_body - ).execute() - - def keepalive(self): - # Not needed - pass - - def watch(self, check_callback, check_timeout, config_reloading=None): - """Checks the mailbox for new messages every n seconds""" - while True: - if config_reloading and config_reloading(): - return - sleep(check_timeout) - if config_reloading and config_reloading(): - return - check_callback(self) - - @lru_cache(maxsize=10) - def _find_label_id_for_label(self, label_name: str) -> str: - results = self.service.users().labels().list(userId="me").execute() - labels = results.get("labels", []) - for label in labels: - if label_name == label["id"] or label_name == label["name"]: - return label["id"] - return "" diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py deleted file mode 100644 index 20836f2..0000000 --- a/parsedmarc/mail/graph.py +++ /dev/null @@ -1,339 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import annotations - -from enum import Enum -from functools import lru_cache -from pathlib import Path -from time import sleep -from typing import Any, List, Optional, Union - -from azure.identity import ( - UsernamePasswordCredential, - DeviceCodeCredential, - ClientSecretCredential, - CertificateCredential, - TokenCachePersistenceOptions, - AuthenticationRecord, -) -from msgraph.core import GraphClient -from requests.exceptions import RequestException - -from parsedmarc.log import logger -from parsedmarc.mail.mailbox_connection import MailboxConnection - -GRAPH_REQUEST_RETRY_ATTEMPTS = 3 -GRAPH_REQUEST_RETRY_DELAY_SECONDS = 5 - - -class AuthMethod(Enum): - DeviceCode = 1 - UsernamePassword = 2 - ClientSecret = 3 - Certificate = 4 - - -def _get_cache_args(token_path: Path, allow_unencrypted_storage): - cache_args: dict[str, Any] = { - "cache_persistence_options": TokenCachePersistenceOptions( - name="parsedmarc", allow_unencrypted_storage=allow_unencrypted_storage - ) - } - auth_record = _load_token(token_path) - if auth_record: - cache_args["authentication_record"] = AuthenticationRecord.deserialize( - auth_record - ) - return cache_args - - -def _load_token(token_path: Path) -> Optional[str]: - if not token_path.exists(): - return None - with token_path.open() as token_file: - return token_file.read() - - -def _cache_auth_record(record: AuthenticationRecord, token_path: Path): - token = record.serialize() - token_path.parent.mkdir(parents=True, exist_ok=True) - with token_path.open("w") as token_file: - token_file.write(token) - - -def _generate_credential(auth_method: str, token_path: Path, **kwargs): - if auth_method == AuthMethod.DeviceCode.name: - credential = DeviceCodeCredential( - client_id=kwargs["client_id"], - disable_automatic_authentication=True, - tenant_id=kwargs["tenant_id"], - **_get_cache_args( - token_path, - allow_unencrypted_storage=kwargs["allow_unencrypted_storage"], - ), - ) - elif auth_method == AuthMethod.UsernamePassword.name: - credential = UsernamePasswordCredential( - client_id=kwargs["client_id"], - client_credential=kwargs["client_secret"], - disable_automatic_authentication=True, - username=kwargs["username"], - password=kwargs["password"], - **_get_cache_args( - token_path, - allow_unencrypted_storage=kwargs["allow_unencrypted_storage"], - ), - ) - elif auth_method == AuthMethod.ClientSecret.name: - credential = ClientSecretCredential( - client_id=kwargs["client_id"], - tenant_id=kwargs["tenant_id"], - client_secret=kwargs["client_secret"], - ) - elif auth_method == AuthMethod.Certificate.name: - cert_path = kwargs.get("certificate_path") - if not cert_path: - raise ValueError( - "certificate_path is required when auth_method is 'Certificate'" - ) - credential = CertificateCredential( - client_id=kwargs["client_id"], - tenant_id=kwargs["tenant_id"], - certificate_path=cert_path, - password=kwargs.get("certificate_password"), - ) - else: - raise RuntimeError(f"Auth method {auth_method} not found") - return credential - - -class MSGraphConnection(MailboxConnection): - _WELL_KNOWN_FOLDERS = { - "inbox": "inbox", - "archive": "archive", - "drafts": "drafts", - "sentitems": "sentitems", - "deleteditems": "deleteditems", - "junkemail": "junkemail", - } - - def __init__( - self, - auth_method: str, - mailbox: str, - graph_url: str, - client_id: str, - client_secret: Optional[str], - username: Optional[str], - password: Optional[str], - tenant_id: str, - token_file: str, - allow_unencrypted_storage: bool, - certificate_path: Optional[str] = None, - certificate_password: Optional[Union[str, bytes]] = None, - ): - token_path = Path(token_file) - credential = _generate_credential( - auth_method, - client_id=client_id, - client_secret=client_secret, - certificate_path=certificate_path, - certificate_password=certificate_password, - username=username, - password=password, - tenant_id=tenant_id, - token_path=token_path, - allow_unencrypted_storage=allow_unencrypted_storage, - ) - client_params = { - "credential": credential, - "cloud": graph_url, - } - if not isinstance(credential, (ClientSecretCredential, CertificateCredential)): - scopes = ["Mail.ReadWrite"] - # Detect if mailbox is shared - if mailbox and username and username != mailbox: - scopes = ["Mail.ReadWrite.Shared"] - auth_record = credential.authenticate(scopes=scopes) - _cache_auth_record(auth_record, token_path) - client_params["scopes"] = scopes - - self._client = GraphClient(**client_params) - self.mailbox_name = mailbox - - def _request_with_retries(self, method_name: str, *args, **kwargs): - for attempt in range(1, GRAPH_REQUEST_RETRY_ATTEMPTS + 1): - try: - return getattr(self._client, method_name)(*args, **kwargs) - except RequestException as error: - if attempt == GRAPH_REQUEST_RETRY_ATTEMPTS: - raise - logger.warning( - "Transient MS Graph %s error on attempt %s/%s: %s", - method_name.upper(), - attempt, - GRAPH_REQUEST_RETRY_ATTEMPTS, - error, - ) - sleep(GRAPH_REQUEST_RETRY_DELAY_SECONDS) - raise RuntimeError("no retry attempts configured") - - def create_folder(self, folder_name: str): - sub_url = "" - path_parts = folder_name.split("/") - if len(path_parts) > 1: # Folder is a subFolder - parent_folder_id = None - for folder in path_parts[:-1]: - parent_folder_id = self._find_folder_id_with_parent( - folder, parent_folder_id - ) - sub_url = f"/{parent_folder_id}/childFolders" - folder_name = path_parts[-1] - - request_body = {"displayName": folder_name} - request_url = f"/users/{self.mailbox_name}/mailFolders{sub_url}" - resp = self._request_with_retries("post", request_url, json=request_body) - if resp.status_code == 409: - logger.debug(f"Folder {folder_name} already exists, skipping creation") - elif resp.status_code == 201: - logger.debug(f"Created folder {folder_name}") - else: - logger.warning(f"Unknown response {resp.status_code} {resp.json()}") - - def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]: - """Returns a list of message UIDs in the specified folder""" - folder_id = self._find_folder_id_from_folder_path(reports_folder) - url = f"/users/{self.mailbox_name}/mailFolders/{folder_id}/messages" - since = kwargs.get("since") - if not since: - since = None - batch_size = kwargs.get("batch_size") - if not batch_size: - batch_size = 0 - emails = self._get_all_messages(url, batch_size, since) - return [email["id"] for email in emails] - - def _get_all_messages(self, url, batch_size, since): - messages: list - params: dict[str, Union[str, int]] = {"$select": "id"} - if since: - params["$filter"] = f"receivedDateTime ge {since}" - if batch_size and batch_size > 0: - params["$top"] = batch_size - else: - params["$top"] = 100 - result = self._request_with_retries("get", url, params=params) - if result.status_code != 200: - raise RuntimeError(f"Failed to fetch messages {result.text}") - messages = result.json()["value"] - # Loop if next page is present and not obtained message limit. - while "@odata.nextLink" in result.json() and ( - since is not None or (batch_size == 0 or batch_size - len(messages) > 0) - ): - result = self._request_with_retries("get", result.json()["@odata.nextLink"]) - if result.status_code != 200: - raise RuntimeError(f"Failed to fetch messages {result.text}") - messages.extend(result.json()["value"]) - return messages - - def mark_message_read(self, message_id: str): - """Marks a message as read""" - url = f"/users/{self.mailbox_name}/messages/{message_id}" - resp = self._request_with_retries("patch", url, json={"isRead": "true"}) - if resp.status_code != 200: - raise RuntimeWarning( - f"Failed to mark message read{resp.status_code}: {resp.json()}" - ) - - def fetch_message(self, message_id: str, **kwargs): - url = f"/users/{self.mailbox_name}/messages/{message_id}/$value" - result = self._request_with_retries("get", url) - if result.status_code != 200: - raise RuntimeWarning( - f"Failed to fetch message{result.status_code}: {result.json()}" - ) - mark_read = kwargs.get("mark_read") - if mark_read: - self.mark_message_read(message_id) - return result.text - - def delete_message(self, message_id: str): - url = f"/users/{self.mailbox_name}/messages/{message_id}" - resp = self._request_with_retries("delete", url) - if resp.status_code != 204: - raise RuntimeWarning( - f"Failed to delete message {resp.status_code}: {resp.json()}" - ) - - def move_message(self, message_id: str, folder_name: str): - folder_id = self._find_folder_id_from_folder_path(folder_name) - request_body = {"destinationId": folder_id} - url = f"/users/{self.mailbox_name}/messages/{message_id}/move" - resp = self._request_with_retries("post", url, json=request_body) - if resp.status_code != 201: - raise RuntimeWarning( - f"Failed to move message {resp.status_code}: {resp.json()}" - ) - - def keepalive(self): - # Not needed - pass - - def watch(self, check_callback, check_timeout, config_reloading=None): - """Checks the mailbox for new messages every n seconds""" - while True: - if config_reloading and config_reloading(): - return - sleep(check_timeout) - if config_reloading and config_reloading(): - return - check_callback(self) - - @lru_cache(maxsize=10) - def _find_folder_id_from_folder_path(self, folder_name: str) -> str: - path_parts = folder_name.split("/") - parent_folder_id = None - if len(path_parts) > 1: - for folder in path_parts[:-1]: - folder_id = self._find_folder_id_with_parent(folder, parent_folder_id) - parent_folder_id = folder_id - return self._find_folder_id_with_parent(path_parts[-1], parent_folder_id) - else: - return self._find_folder_id_with_parent(folder_name, None) - - def _get_well_known_folder_id(self, folder_name: str) -> Optional[str]: - folder_key = folder_name.lower().replace(" ", "").replace("-", "") - alias = self._WELL_KNOWN_FOLDERS.get(folder_key) - if alias is None: - return None - - url = f"/users/{self.mailbox_name}/mailFolders/{alias}?$select=id,displayName" - folder_resp = self._request_with_retries("get", url) - if folder_resp.status_code != 200: - return None - payload = folder_resp.json() - return payload.get("id") - - def _find_folder_id_with_parent( - self, folder_name: str, parent_folder_id: Optional[str] - ): - sub_url = "" - if parent_folder_id is not None: - sub_url = f"/{parent_folder_id}/childFolders" - url = f"/users/{self.mailbox_name}/mailFolders{sub_url}" - filter = f"?$filter=displayName eq '{folder_name}'" - folders_resp = self._request_with_retries("get", url + filter) - if folders_resp.status_code != 200: - if parent_folder_id is None: - well_known_folder_id = self._get_well_known_folder_id(folder_name) - if well_known_folder_id: - return well_known_folder_id - raise RuntimeWarning(f"Failed to list folders.{folders_resp.json()}") - folders: list = folders_resp.json()["value"] - matched_folders = [ - folder for folder in folders if folder["displayName"] == folder_name - ] - if len(matched_folders) == 0: - raise RuntimeError(f"folder {folder_name} not found") - selected_folder = matched_folders[0] - return selected_folder["id"] diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py deleted file mode 100644 index 5201fa7..0000000 --- a/parsedmarc/mail/imap.py +++ /dev/null @@ -1,117 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import annotations - -from typing import cast - -from time import sleep - -from imapclient.exceptions import IMAPClientError -from mailsuite.imap import IMAPClient -from socket import timeout - -from parsedmarc.log import logger -from parsedmarc.mail.mailbox_connection import MailboxConnection - - -class IMAPConnection(MailboxConnection): - def __init__( - self, - host: str, - user: str, - password: str, - port: int = 993, - ssl: bool = True, - verify: bool = True, - timeout: int = 30, - max_retries: int = 4, - ): - self._username = user - self._password = password - self._verify = verify - self._client = IMAPClient( - host, - user, - password, - port=port, - ssl=ssl, - verify=verify, - timeout=timeout, - max_retries=max_retries, - ) - - def create_folder(self, folder_name: str): - self._client.create_folder(folder_name) - - def fetch_messages(self, reports_folder: str, **kwargs): - self._client.select_folder(reports_folder) - since = kwargs.get("since") - if since is not None: - return self._client.search(f"SINCE {since}") - else: - return self._client.search() - - def fetch_message(self, message_id: int): - return cast(str, self._client.fetch_message(message_id, parse=False)) - - def delete_message(self, message_id: int): - try: - self._client.delete_messages([message_id]) - except IMAPClientError as error: - logger.warning( - "IMAP delete fallback for message %s due to server error: %s", - message_id, - error, - ) - self._client.add_flags([message_id], [r"\Deleted"], silent=True) - self._client.expunge() - - def move_message(self, message_id: int, folder_name: str): - try: - self._client.move_messages([message_id], folder_name) - except IMAPClientError as error: - logger.warning( - "IMAP move fallback for message %s due to server error: %s", - message_id, - error, - ) - self._client.copy([message_id], folder_name) - self.delete_message(message_id) - - def keepalive(self): - self._client.noop() - - def watch(self, check_callback, check_timeout, config_reloading=None): - """ - Use an IDLE IMAP connection to parse incoming emails, - and pass the results to a callback function - """ - - # IDLE callback sends IMAPClient object, - # send back the imap connection object instead - def idle_callback_wrapper(client: IMAPClient): - self._client = client - check_callback(self) - - while True: - if config_reloading and config_reloading(): - return - try: - IMAPClient( - host=self._client.host, - username=self._username, - password=self._password, - port=self._client.port, - ssl=self._client.ssl, - verify=self._verify, - idle_callback=idle_callback_wrapper, - idle_timeout=check_timeout, - ) - except (timeout, IMAPClientError): - logger.warning("IMAP connection timeout. Reconnecting...") - sleep(check_timeout) - except Exception as e: - logger.warning("IMAP connection error. {0}. Reconnecting...".format(e)) - sleep(check_timeout) - if config_reloading and config_reloading(): - return diff --git a/parsedmarc/mail/mailbox_connection.py b/parsedmarc/mail/mailbox_connection.py deleted file mode 100644 index 6c94336..0000000 --- a/parsedmarc/mail/mailbox_connection.py +++ /dev/null @@ -1,32 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import annotations - -from abc import ABC - - -class MailboxConnection(ABC): - """ - Interface for a mailbox connection - """ - - def create_folder(self, folder_name: str): - raise NotImplementedError - - def fetch_messages(self, reports_folder: str, **kwargs): - raise NotImplementedError - - def fetch_message(self, message_id) -> str: - raise NotImplementedError - - def delete_message(self, message_id): - raise NotImplementedError - - def move_message(self, message_id, folder_name: str): - raise NotImplementedError - - def keepalive(self): - raise NotImplementedError - - def watch(self, check_callback, check_timeout, config_reloading=None): - raise NotImplementedError diff --git a/parsedmarc/mail/maildir.py b/parsedmarc/mail/maildir.py deleted file mode 100644 index 3e7a4f0..0000000 --- a/parsedmarc/mail/maildir.py +++ /dev/null @@ -1,105 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import annotations - -import mailbox -import os -from time import sleep -from typing import Dict - -from parsedmarc.log import logger -from parsedmarc.mail.mailbox_connection import MailboxConnection - - -class MaildirConnection(MailboxConnection): - def __init__( - self, - maildir_path: str, - maildir_create: bool = False, - ): - self._maildir_path = maildir_path - self._maildir_create = maildir_create - try: - maildir_owner = os.stat(maildir_path).st_uid - except OSError: - maildir_owner = None - current_uid = os.getuid() - if maildir_owner is not None and current_uid != maildir_owner: - if current_uid == 0: - try: - logger.warning( - "Switching uid to {} to access Maildir".format(maildir_owner) - ) - os.setuid(maildir_owner) - except OSError as e: - logger.warning( - "Failed to switch uid to {}: {}".format(maildir_owner, e) - ) - else: - logger.warning( - "Runtime uid {} differs from maildir {} owner {}. " - "Access may fail if permissions are insufficient.".format( - current_uid, maildir_path, maildir_owner - ) - ) - if maildir_create: - for subdir in ("cur", "new", "tmp"): - os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True) - self._client = mailbox.Maildir(maildir_path, create=maildir_create) - self._active_folder: mailbox.Maildir = self._client - self._subfolder_client: Dict[str, mailbox.Maildir] = {} - - def _get_folder(self, folder_name: str) -> mailbox.Maildir: - """Return a cached subfolder handle, creating it if needed.""" - if folder_name not in self._subfolder_client: - self._subfolder_client[folder_name] = self._client.add_folder(folder_name) - return self._subfolder_client[folder_name] - - def create_folder(self, folder_name: str): - self._get_folder(folder_name) - - def fetch_messages(self, reports_folder: str, **kwargs): - if reports_folder and reports_folder != "INBOX": - self._active_folder = self._get_folder(reports_folder) - else: - self._active_folder = self._client - return self._active_folder.keys() - - def fetch_message(self, message_id: str, **kwargs) -> str: - msg = self._active_folder.get(message_id) - if msg is None: - return "" - msg_str = msg.as_string() - if kwargs.get("mark_read"): - # Maildir spec: a message is "read" once it has been moved out of - # new/ into cur/ with the "S" (Seen) flag set in its info field. - msg.set_subdir("cur") - msg.add_flag("S") - self._active_folder[message_id] = msg - return msg_str or "" - - def delete_message(self, message_id: str): - self._active_folder.remove(message_id) - - def move_message(self, message_id: str, folder_name: str): - message_data = self._active_folder.get(message_id) - if message_data is None: - return - dest = self._get_folder(folder_name) - dest.add(message_data) - self._active_folder.remove(message_id) - - def keepalive(self): - return - - def watch(self, check_callback, check_timeout, config_reloading=None): - while True: - if config_reloading and config_reloading(): - return - try: - check_callback(self) - except Exception as e: - logger.warning("Maildir init error. {0}".format(e)) - if config_reloading and config_reloading(): - return - sleep(check_timeout) diff --git a/pyproject.toml b/pyproject.toml index 458ee06..195e94f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,17 +39,10 @@ dependencies = [ "elasticsearch-dsl==7.4.0", "elasticsearch<7.14.0", "expiringdict>=1.1.4", - "google-api-core>=2.4.0", - "google-api-python-client>=2.35.0", - "google-auth-httplib2>=0.1.0", - "google-auth-oauthlib>=0.4.6", - "google-auth>=2.3.3", - "imapclient>=3.1.0", "kafka-python-ng>=2.2.2", "lxml>=4.4.0", - "mailsuite>=1.11.2", + "mailsuite[gmail,msgraph]>=2.0.0", "maxminddb>=2.0.0", - "msgraph-core==0.2.2", "opensearch-py>=2.4.2,<=4.0.0", "publicsuffixlist>=0.10.0", "pygelf>=0.4.2", diff --git a/tests.py b/tests.py index dfd2933..4ae8d49 100755 --- a/tests.py +++ b/tests.py @@ -10,32 +10,18 @@ import signal import sys import tempfile import unittest -from base64 import urlsafe_b64encode from configparser import ConfigParser from glob import glob from pathlib import Path -from tempfile import NamedTemporaryFile, TemporaryDirectory +from tempfile import NamedTemporaryFile from typing import cast from types import SimpleNamespace from unittest.mock import MagicMock, patch from lxml import etree # type: ignore[import-untyped] -from googleapiclient.errors import HttpError -from httplib2 import Response -from imapclient.exceptions import IMAPClientError import parsedmarc import parsedmarc.cli -from parsedmarc.mail.gmail import GmailConnection -from parsedmarc.mail.gmail import _get_creds -from parsedmarc.mail.graph import MSGraphConnection -from parsedmarc.mail.graph import _generate_credential -from parsedmarc.mail.graph import _get_cache_args -from parsedmarc.mail.graph import _load_token -from parsedmarc.mail.imap import IMAPConnection -import parsedmarc.mail.gmail as gmail_module -import parsedmarc.mail.graph as graph_module -import parsedmarc.mail.imap as imap_module import parsedmarc.elastic import parsedmarc.opensearch as opensearch_module import parsedmarc.utils @@ -660,677 +646,11 @@ hosts = localhost mock_save_forensic_opensearch.assert_called_once() -class _FakeGraphResponse: - def __init__(self, status_code, payload=None, text=""): - self.status_code = status_code - self._payload = payload or {} - self.text = text - - def json(self): - return self._payload - - class _BreakLoop(BaseException): pass -class TestGmailConnection(unittest.TestCase): - def _build_connection(self, *, paginate=True): - connection = GmailConnection.__new__(GmailConnection) - connection.include_spam_trash = False - connection.reports_label_id = "REPORTS" - connection.paginate_messages = paginate - connection.service = MagicMock() - return connection - - def testFindLabelId(self): - connection = self._build_connection() - labels_api = connection.service.users.return_value.labels.return_value - labels_api.list.return_value.execute.return_value = { - "labels": [ - {"id": "INBOX", "name": "INBOX"}, - {"id": "REPORTS", "name": "Reports"}, - ] - } - self.assertEqual(connection._find_label_id_for_label("Reports"), "REPORTS") - self.assertEqual(connection._find_label_id_for_label("MISSING"), "") - - def testFetchMessagesWithPagination(self): - connection = self._build_connection(paginate=True) - messages_api = connection.service.users.return_value.messages.return_value - - def list_side_effect(**kwargs): - response = MagicMock() - if kwargs.get("pageToken") is None: - response.execute.return_value = { - "messages": [{"id": "a"}, {"id": "b"}], - "nextPageToken": "n1", - } - else: - response.execute.return_value = {"messages": [{"id": "c"}]} - return response - - messages_api.list.side_effect = list_side_effect - connection._find_label_id_for_label = MagicMock(return_value="REPORTS") - self.assertEqual(connection.fetch_messages("Reports"), ["a", "b", "c"]) - - def testFetchMessageDecoding(self): - connection = self._build_connection() - messages_api = connection.service.users.return_value.messages.return_value - raw = urlsafe_b64encode(b"Subject: test\n\nbody").decode() - messages_api.get.return_value.execute.return_value = {"raw": raw} - content = connection.fetch_message("m1") - self.assertIn("Subject: test", content) - - def testMoveAndDeleteMessage(self): - connection = self._build_connection() - connection._find_label_id_for_label = MagicMock(return_value="ARCHIVE") - messages_api = connection.service.users.return_value.messages.return_value - messages_api.modify.return_value.execute.return_value = {} - connection.move_message("m1", "Archive") - messages_api.modify.assert_called_once() - connection.delete_message("m1") - messages_api.delete.assert_called_once_with(userId="me", id="m1") - messages_api.delete.return_value.execute.assert_called_once() - - def testGetCredsFromTokenFile(self): - creds = MagicMock() - creds.valid = True - with NamedTemporaryFile("w", delete=False) as token_file: - token_file.write("{}") - token_path = token_file.name - try: - with patch.object( - gmail_module.Credentials, - "from_authorized_user_file", - return_value=creds, - ): - returned = _get_creds(token_path, "credentials.json", ["scope"], 8080) - finally: - os.remove(token_path) - self.assertEqual(returned, creds) - - def testGetCredsWithOauthFlow(self): - expired_creds = MagicMock() - expired_creds.valid = False - expired_creds.expired = False - expired_creds.refresh_token = None - new_creds = MagicMock() - new_creds.valid = True - new_creds.to_json.return_value = '{"token":"x"}' - flow = MagicMock() - flow.run_local_server.return_value = new_creds - - with NamedTemporaryFile("w", delete=False) as token_file: - token_file.write("{}") - token_path = token_file.name - try: - with patch.object( - gmail_module.Credentials, - "from_authorized_user_file", - return_value=expired_creds, - ): - with patch.object( - gmail_module.InstalledAppFlow, - "from_client_secrets_file", - return_value=flow, - ): - returned = _get_creds( - token_path, "credentials.json", ["scope"], 8080 - ) - finally: - os.remove(token_path) - self.assertEqual(returned, new_creds) - flow.run_local_server.assert_called_once() - - def testGetCredsRefreshesExpiredToken(self): - expired_creds = MagicMock() - expired_creds.valid = False - expired_creds.expired = True - expired_creds.refresh_token = "rt" - expired_creds.to_json.return_value = '{"token":"refreshed"}' - - with NamedTemporaryFile("w", delete=False) as token_file: - token_file.write("{}") - token_path = token_file.name - try: - with patch.object( - gmail_module.Credentials, - "from_authorized_user_file", - return_value=expired_creds, - ): - returned = _get_creds(token_path, "credentials.json", ["scope"], 8080) - finally: - os.remove(token_path) - - self.assertEqual(returned, expired_creds) - expired_creds.refresh.assert_called_once() - - def testCreateFolderConflictIgnored(self): - connection = self._build_connection() - labels_api = connection.service.users.return_value.labels.return_value - conflict = HttpError(Response({"status": "409"}), b"conflict") - labels_api.create.return_value.execute.side_effect = conflict - connection.create_folder("Existing") - - -class TestGraphConnection(unittest.TestCase): - def testLoadTokenMissing(self): - with TemporaryDirectory() as temp_dir: - missing_path = Path(temp_dir) / "missing-token-file" - self.assertIsNone(_load_token(missing_path)) - - def testLoadTokenExisting(self): - with NamedTemporaryFile("w", delete=False) as token_file: - token_file.write("serialized-auth-record") - token_path = token_file.name - try: - self.assertEqual(_load_token(Path(token_path)), "serialized-auth-record") - finally: - os.remove(token_path) - - def testGetAllMessagesPagination(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - first_response = _FakeGraphResponse( - 200, {"value": [{"id": "1"}], "@odata.nextLink": "next-url"} - ) - second_response = _FakeGraphResponse(200, {"value": [{"id": "2"}]}) - connection._client = MagicMock() - connection._client.get.side_effect = [first_response, second_response] - messages = connection._get_all_messages("/url", batch_size=0, since=None) - self.assertEqual([msg["id"] for msg in messages], ["1", "2"]) - - def testGetAllMessagesInitialRequestFailure(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection._client = MagicMock() - connection._client.get.return_value = _FakeGraphResponse(500, text="boom") - with self.assertRaises(RuntimeError): - connection._get_all_messages("/url", batch_size=0, since=None) - - def testGetAllMessagesRetriesTransientRequestErrors(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection._client = MagicMock() - connection._client.get.side_effect = [ - graph_module.RequestException("connection reset"), - _FakeGraphResponse(200, {"value": [{"id": "1"}]}), - ] - with patch.object(graph_module, "sleep") as mocked_sleep: - messages = connection._get_all_messages("/url", batch_size=0, since=None) - self.assertEqual([msg["id"] for msg in messages], ["1"]) - mocked_sleep.assert_called_once_with( - graph_module.GRAPH_REQUEST_RETRY_DELAY_SECONDS - ) - - def testGetAllMessagesRaisesAfterRetryExhaustion(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection._client = MagicMock() - connection._client.get.side_effect = graph_module.RequestException( - "connection reset" - ) - with patch.object(graph_module, "sleep") as mocked_sleep: - with self.assertRaises(graph_module.RequestException): - connection._get_all_messages("/url", batch_size=0, since=None) - self.assertEqual( - mocked_sleep.call_count, graph_module.GRAPH_REQUEST_RETRY_ATTEMPTS - 1 - ) - - def testGetAllMessagesNextPageFailure(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - first_response = _FakeGraphResponse( - 200, {"value": [{"id": "1"}], "@odata.nextLink": "next-url"} - ) - second_response = _FakeGraphResponse(500, text="page-fail") - connection._client = MagicMock() - connection._client.get.side_effect = [first_response, second_response] - with self.assertRaises(RuntimeError): - connection._get_all_messages("/url", batch_size=0, since=None) - - def testGetAllMessagesHonorsBatchSizeLimit(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - first_response = _FakeGraphResponse( - 200, - { - "value": [{"id": "1"}, {"id": "2"}], - "@odata.nextLink": "next-url", - }, - ) - connection._client = MagicMock() - connection._client.get.return_value = first_response - messages = connection._get_all_messages("/url", batch_size=2, since=None) - self.assertEqual([msg["id"] for msg in messages], ["1", "2"]) - connection._client.get.assert_called_once() - - def testFetchMessagesPassesSinceAndBatchSize(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection.mailbox_name = "mailbox@example.com" - connection._find_folder_id_from_folder_path = MagicMock( - return_value="folder-id" - ) - connection._get_all_messages = MagicMock(return_value=[{"id": "1"}]) - self.assertEqual( - connection.fetch_messages("Inbox", since="2026-03-01", batch_size=5), ["1"] - ) - connection._get_all_messages.assert_called_once_with( - "/users/mailbox@example.com/mailFolders/folder-id/messages", - 5, - "2026-03-01", - ) - - def testFetchMessageMarksRead(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection.mailbox_name = "mailbox@example.com" - connection._client = MagicMock() - connection._client.get.return_value = _FakeGraphResponse( - 200, text="email-content" - ) - connection.mark_message_read = MagicMock() - content = connection.fetch_message("123", mark_read=True) - self.assertEqual(content, "email-content") - connection.mark_message_read.assert_called_once_with("123") - - def testFindFolderIdNotFound(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection.mailbox_name = "mailbox@example.com" - connection._client = MagicMock() - connection._client.get.return_value = _FakeGraphResponse(200, {"value": []}) - with self.assertRaises(RuntimeError): - connection._find_folder_id_with_parent("Missing", None) - - def testGetCacheArgsWithAuthRecord(self): - with NamedTemporaryFile("w", delete=False) as token_file: - token_file.write("serialized") - token_path = Path(token_file.name) - try: - with patch.object( - graph_module.AuthenticationRecord, - "deserialize", - return_value="auth_record", - ): - args = _get_cache_args(token_path, allow_unencrypted_storage=False) - self.assertIn("authentication_record", args) - finally: - os.remove(token_path) - - def testGenerateCredentialInvalid(self): - with self.assertRaises(RuntimeError): - _generate_credential( - "Nope", - Path("/tmp/token"), - client_id="x", - client_secret="y", - username="u", - password="p", - tenant_id="t", - allow_unencrypted_storage=False, - ) - - def testGenerateCredentialDeviceCode(self): - fake_credential = object() - with patch.object( - graph_module, "_get_cache_args", return_value={"cached": True} - ): - with patch.object( - graph_module, - "DeviceCodeCredential", - return_value=fake_credential, - ) as mocked: - result = _generate_credential( - graph_module.AuthMethod.DeviceCode.name, - Path("/tmp/token"), - client_id="cid", - client_secret="secret", - username="user", - password="pass", - tenant_id="tenant", - allow_unencrypted_storage=True, - ) - self.assertIs(result, fake_credential) - mocked.assert_called_once() - - def testGenerateCredentialClientSecret(self): - fake_credential = object() - with patch.object( - graph_module, "ClientSecretCredential", return_value=fake_credential - ) as mocked: - result = _generate_credential( - graph_module.AuthMethod.ClientSecret.name, - Path("/tmp/token"), - client_id="cid", - client_secret="secret", - username="user", - password="pass", - tenant_id="tenant", - allow_unencrypted_storage=False, - ) - self.assertIs(result, fake_credential) - mocked.assert_called_once_with( - client_id="cid", tenant_id="tenant", client_secret="secret" - ) - - def testGenerateCredentialCertificate(self): - fake_credential = object() - with patch.object( - graph_module, "CertificateCredential", return_value=fake_credential - ) as mocked: - result = _generate_credential( - graph_module.AuthMethod.Certificate.name, - Path("/tmp/token"), - client_id="cid", - client_secret="secret", - certificate_path="/tmp/cert.pem", - certificate_password="secret-pass", - username="user", - password="pass", - tenant_id="tenant", - allow_unencrypted_storage=False, - ) - self.assertIs(result, fake_credential) - mocked.assert_called_once_with( - client_id="cid", - tenant_id="tenant", - certificate_path="/tmp/cert.pem", - password="secret-pass", - ) - - def testGenerateCredentialCertificateRequiresPath(self): - with self.assertRaisesRegex( - ValueError, - "certificate_path is required when auth_method is 'Certificate'", - ): - _generate_credential( - graph_module.AuthMethod.Certificate.name, - Path("/tmp/token"), - client_id="cid", - client_secret=None, - certificate_path=None, - certificate_password="secret-pass", - username=None, - password=None, - tenant_id="tenant", - allow_unencrypted_storage=False, - ) - - def testInitUsesSharedMailboxScopes(self): - class FakeCredential: - def __init__(self): - self.authenticate = MagicMock(return_value="auth-record") - - fake_credential = FakeCredential() - with patch.object( - graph_module, "_generate_credential", return_value=fake_credential - ): - with patch.object(graph_module, "_cache_auth_record") as cache_auth: - with patch.object(graph_module, "GraphClient") as graph_client: - MSGraphConnection( - auth_method=graph_module.AuthMethod.DeviceCode.name, - mailbox="shared@example.com", - graph_url="https://graph.microsoft.com", - client_id="cid", - client_secret="secret", - username="owner@example.com", - password="pass", - tenant_id="tenant", - token_file="/tmp/token-file", - allow_unencrypted_storage=True, - ) - fake_credential.authenticate.assert_called_once_with( - scopes=["Mail.ReadWrite.Shared"] - ) - cache_auth.assert_called_once() - graph_client.assert_called_once() - self.assertEqual( - graph_client.call_args.kwargs.get("scopes"), ["Mail.ReadWrite.Shared"] - ) - - def testInitWithoutUsernameUsesDefaultMailReadWriteScope(self): - class FakeCredential: - def __init__(self): - self.authenticate = MagicMock(return_value="auth-record") - - fake_credential = FakeCredential() - with patch.object( - graph_module, "_generate_credential", return_value=fake_credential - ): - with patch.object(graph_module, "_cache_auth_record") as cache_auth: - with patch.object(graph_module, "GraphClient") as graph_client: - MSGraphConnection( - auth_method=graph_module.AuthMethod.DeviceCode.name, - mailbox="owner@example.com", - graph_url="https://graph.microsoft.com", - client_id="cid", - client_secret="secret", - username=None, - password=None, - tenant_id="tenant", - token_file="/tmp/token-file", - allow_unencrypted_storage=True, - ) - fake_credential.authenticate.assert_called_once_with(scopes=["Mail.ReadWrite"]) - cache_auth.assert_called_once() - graph_client.assert_called_once() - self.assertEqual( - graph_client.call_args.kwargs.get("scopes"), ["Mail.ReadWrite"] - ) - - def testInitCertificateAuthSkipsInteractiveAuthenticate(self): - class DummyCertificateCredential: - pass - - fake_credential = DummyCertificateCredential() - with patch.object( - graph_module, "CertificateCredential", DummyCertificateCredential - ): - with patch.object( - graph_module, "_generate_credential", return_value=fake_credential - ): - with patch.object(graph_module, "_cache_auth_record") as cache_auth: - with patch.object(graph_module, "GraphClient") as graph_client: - MSGraphConnection( - auth_method=graph_module.AuthMethod.Certificate.name, - mailbox="shared@example.com", - graph_url="https://graph.microsoft.com", - client_id="cid", - client_secret=None, - certificate_path="/tmp/cert.pem", - certificate_password="secret-pass", - username=None, - password=None, - tenant_id="tenant", - token_file="/tmp/token-file", - allow_unencrypted_storage=False, - ) - cache_auth.assert_not_called() - graph_client.assert_called_once() - self.assertNotIn("scopes", graph_client.call_args.kwargs) - - def testCreateFolderAndMoveErrors(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection.mailbox_name = "mailbox@example.com" - connection._client = MagicMock() - connection._client.post.return_value = _FakeGraphResponse(500, {"error": "x"}) - connection._find_folder_id_from_folder_path = MagicMock(return_value="dest") - with self.assertRaises(RuntimeWarning): - connection.move_message("m1", "Archive") - connection._client.post.return_value = _FakeGraphResponse(409, {}) - connection.create_folder("Archive") - - def testMarkReadDeleteFailures(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection.mailbox_name = "mailbox@example.com" - connection._client = MagicMock() - connection._client.patch.return_value = _FakeGraphResponse(500, {"error": "x"}) - with self.assertRaises(RuntimeWarning): - connection.mark_message_read("m1") - connection._client.delete.return_value = _FakeGraphResponse(500, {"error": "x"}) - with self.assertRaises(RuntimeWarning): - connection.delete_message("m1") - - -class TestImapConnection(unittest.TestCase): - def testDelegatesToImapClient(self): - with patch.object(imap_module, "IMAPClient") as mocked_client_cls: - mocked_client = MagicMock() - mocked_client_cls.return_value = mocked_client - connection = IMAPConnection( - "imap.example.com", user="user", password="pass" - ) - connection.create_folder("Archive") - mocked_client.create_folder.assert_called_once_with("Archive") - mocked_client.search.return_value = [1, 2] - self.assertEqual(connection.fetch_messages("INBOX"), [1, 2]) - mocked_client.select_folder.assert_called_with("INBOX") - connection.fetch_messages("INBOX", since="2026-03-01") - mocked_client.search.assert_called_with("SINCE 2026-03-01") - mocked_client.fetch_message.return_value = "raw-message" - self.assertEqual(connection.fetch_message(1), "raw-message") - connection.delete_message(7) - mocked_client.delete_messages.assert_called_once_with([7]) - connection.move_message(8, "Archive") - mocked_client.move_messages.assert_called_once_with([8], "Archive") - connection.keepalive() - mocked_client.noop.assert_called_once() - - def testWatchReconnectPath(self): - with patch.object(imap_module, "IMAPClient") as mocked_client_cls: - base_client = MagicMock() - base_client.host = "imap.example.com" - base_client.port = 993 - base_client.ssl = True - mocked_client_cls.return_value = base_client - connection = IMAPConnection( - "imap.example.com", user="user", password="pass" - ) - calls = {"count": 0} - - def fake_imap_constructor(*args, **kwargs): - idle_callback = kwargs.get("idle_callback") - if calls["count"] == 0: - calls["count"] += 1 - raise IMAPClientError("timeout") - if idle_callback is not None: - idle_callback(base_client) - raise _BreakLoop() - - callback = MagicMock() - with patch.object(imap_module, "sleep", return_value=None): - with patch.object( - imap_module, "IMAPClient", side_effect=fake_imap_constructor - ): - with self.assertRaises(_BreakLoop): - connection.watch(callback, check_timeout=1) - callback.assert_called_once_with(connection) - - class TestGmailAuthModes(unittest.TestCase): - @patch( - "parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file" - ) - def testGetCredsServiceAccountWithoutSubject(self, mock_from_service_account_file): - service_creds = MagicMock() - service_creds.with_subject.return_value = MagicMock() - mock_from_service_account_file.return_value = service_creds - - creds = gmail_module._get_creds( - token_file=".token", - credentials_file="service-account.json", - scopes=["https://www.googleapis.com/auth/gmail.readonly"], - oauth2_port=8080, - auth_mode="service_account", - service_account_user=None, - ) - - self.assertIs(creds, service_creds) - mock_from_service_account_file.assert_called_once_with( - "service-account.json", - scopes=["https://www.googleapis.com/auth/gmail.readonly"], - ) - service_creds.with_subject.assert_not_called() - - @patch( - "parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file" - ) - def testGetCredsServiceAccountWithSubject(self, mock_from_service_account_file): - base_creds = MagicMock() - delegated_creds = MagicMock() - base_creds.with_subject.return_value = delegated_creds - mock_from_service_account_file.return_value = base_creds - - creds = gmail_module._get_creds( - token_file=".token", - credentials_file="service-account.json", - scopes=["https://www.googleapis.com/auth/gmail.modify"], - oauth2_port=8080, - auth_mode="service_account", - service_account_user="dmarc@example.com", - ) - - self.assertIs(creds, delegated_creds) - base_creds.with_subject.assert_called_once_with("dmarc@example.com") - - def testGetCredsRejectsUnsupportedAuthMode(self): - with self.assertRaises(ValueError): - gmail_module._get_creds( - token_file=".token", - credentials_file="client-secret.json", - scopes=["https://www.googleapis.com/auth/gmail.modify"], - oauth2_port=8080, - auth_mode="unsupported", - ) - - @patch("parsedmarc.mail.gmail.Path.exists", return_value=True) - @patch("parsedmarc.mail.gmail.Credentials.from_authorized_user_file") - def testGetCredsInstalledAppStillUsesTokenFile( - self, mock_from_authorized_user_file, _mock_exists - ): - token_creds = MagicMock() - token_creds.valid = True - mock_from_authorized_user_file.return_value = token_creds - - creds = gmail_module._get_creds( - token_file=".token", - credentials_file="client-secret.json", - scopes=["https://www.googleapis.com/auth/gmail.modify"], - oauth2_port=8080, - auth_mode="installed_app", - ) - - self.assertIs(creds, token_creds) - mock_from_authorized_user_file.assert_called_once_with( - ".token", - ["https://www.googleapis.com/auth/gmail.modify"], - ) - - @patch("parsedmarc.mail.gmail.GmailConnection._find_label_id_for_label") - @patch("parsedmarc.mail.gmail.build") - @patch("parsedmarc.mail.gmail._get_creds") - def testGmailConnectionPassesAuthModeAndDelegatedUser( - self, mock_get_creds, mock_build, mock_find_label - ): - mock_get_creds.return_value = MagicMock() - mock_build.return_value = MagicMock() - mock_find_label.return_value = "INBOX" - - gmail_module.GmailConnection( - token_file=".token", - credentials_file="service-account.json", - scopes=["https://www.googleapis.com/auth/gmail.modify"], - include_spam_trash=False, - reports_folder="INBOX", - oauth2_port=8080, - paginate_messages=True, - auth_mode="service_account", - service_account_user="dmarc@example.com", - ) - - mock_get_creds.assert_called_once_with( - ".token", - "service-account.json", - ["https://www.googleapis.com/auth/gmail.modify"], - 8080, - auth_mode="service_account", - service_account_user="dmarc@example.com", - ) - @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") @patch("parsedmarc.cli.GmailConnection") def testCliPassesGmailServiceAccountAuthSettings( @@ -1399,62 +719,6 @@ scopes = https://www.googleapis.com/auth/gmail.modify ) -class TestImapFallbacks(unittest.TestCase): - def testDeleteSuccessDoesNotUseFallback(self): - connection = IMAPConnection.__new__(IMAPConnection) - connection._client = MagicMock() - connection.delete_message(42) - connection._client.delete_messages.assert_called_once_with([42]) - connection._client.add_flags.assert_not_called() - connection._client.expunge.assert_not_called() - - def testDeleteFallbackUsesFlagsAndExpunge(self): - connection = IMAPConnection.__new__(IMAPConnection) - connection._client = MagicMock() - connection._client.delete_messages.side_effect = IMAPClientError("uid expunge") - connection.delete_message(42) - connection._client.add_flags.assert_called_once_with( - [42], [r"\Deleted"], silent=True - ) - connection._client.expunge.assert_called_once_with() - - def testDeleteFallbackErrorPropagates(self): - connection = IMAPConnection.__new__(IMAPConnection) - connection._client = MagicMock() - connection._client.delete_messages.side_effect = IMAPClientError("uid expunge") - connection._client.add_flags.side_effect = IMAPClientError("flag failed") - with self.assertRaises(IMAPClientError): - connection.delete_message(42) - - def testMoveSuccessDoesNotUseFallback(self): - connection = IMAPConnection.__new__(IMAPConnection) - connection._client = MagicMock() - with patch.object(connection, "delete_message") as delete_mock: - connection.move_message(99, "Archive") - connection._client.move_messages.assert_called_once_with([99], "Archive") - connection._client.copy.assert_not_called() - delete_mock.assert_not_called() - - def testMoveFallbackCopiesThenDeletes(self): - connection = IMAPConnection.__new__(IMAPConnection) - connection._client = MagicMock() - connection._client.move_messages.side_effect = IMAPClientError("move failed") - with patch.object(connection, "delete_message") as delete_mock: - connection.move_message(99, "Archive") - connection._client.copy.assert_called_once_with([99], "Archive") - delete_mock.assert_called_once_with(99) - - def testMoveFallbackCopyErrorPropagates(self): - connection = IMAPConnection.__new__(IMAPConnection) - connection._client = MagicMock() - connection._client.move_messages.side_effect = IMAPClientError("move failed") - connection._client.copy.side_effect = IMAPClientError("copy failed") - with patch.object(connection, "delete_message") as delete_mock: - with self.assertRaises(IMAPClientError): - connection.move_message(99, "Archive") - delete_mock.assert_not_called() - - class TestMailboxWatchSince(unittest.TestCase): def setUp(self): from parsedmarc.log import logger as _logger @@ -1740,83 +1004,6 @@ user = owner@example.com mock_get_mailbox_reports.assert_not_called() -class _FakeGraphClient: - def get(self, url, params=None): - if "/mailFolders/inbox?$select=id,displayName" in url: - return _FakeGraphResponse(200, {"id": "inbox-id", "displayName": "Inbox"}) - - if "/mailFolders?$filter=displayName eq 'Inbox'" in url: - return _FakeGraphResponse( - 404, - { - "error": { - "code": "ErrorItemNotFound", - "message": "Default folder Root not found.", - } - }, - ) - - if "/mailFolders?$filter=displayName eq 'Custom'" in url: - return _FakeGraphResponse( - 404, - { - "error": { - "code": "ErrorItemNotFound", - "message": "Default folder Root not found.", - } - }, - ) - - return _FakeGraphResponse(404, {"error": {"code": "NotFound"}}) - - -class TestMSGraphFolderFallback(unittest.TestCase): - def testWellKnownFolderFallback(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection.mailbox_name = "shared@example.com" - connection._client = _FakeGraphClient() # type: ignore[assignment] - connection._request_with_retries = MagicMock( - side_effect=lambda method_name, *args, **kwargs: getattr( - connection._client, method_name - )(*args, **kwargs) - ) - - folder_id = connection._find_folder_id_with_parent("Inbox", None) - self.assertEqual(folder_id, "inbox-id") - connection._request_with_retries.assert_any_call( - "get", - "/users/shared@example.com/mailFolders?$filter=displayName eq 'Inbox'", - ) - connection._request_with_retries.assert_any_call( - "get", "/users/shared@example.com/mailFolders/inbox?$select=id,displayName" - ) - - def testUnknownFolderStillFails(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection.mailbox_name = "shared@example.com" - connection._client = _FakeGraphClient() # type: ignore[assignment] - connection._request_with_retries = MagicMock( - side_effect=lambda method_name, *args, **kwargs: getattr( - connection._client, method_name - )(*args, **kwargs) - ) - - with self.assertRaises(RuntimeWarning): - connection._find_folder_id_from_folder_path("Custom") - - def testSingleSegmentPathAvoidsExtraWellKnownLookupWhenListingSucceeds(self): - connection = MSGraphConnection.__new__(MSGraphConnection) - connection.mailbox_name = "shared@example.com" - connection._find_folder_id_with_parent = MagicMock(return_value="custom-id") - connection._get_well_known_folder_id = MagicMock(return_value="inbox-id") - - folder_id = connection._find_folder_id_from_folder_path("Inbox") - - self.assertEqual(folder_id, "custom-id") - connection._find_folder_id_with_parent.assert_called_once_with("Inbox", None) - connection._get_well_known_folder_id.assert_not_called() - - class TestMSGraphCliValidation(unittest.TestCase): @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") @patch("parsedmarc.cli.MSGraphConnection") @@ -2667,174 +1854,6 @@ password = test-password self.assertNotIn("unmapped-1", report_ids) -class TestMaildirConnection(unittest.TestCase): - """Tests for MaildirConnection subdirectory creation.""" - - def test_create_subdirs_when_missing(self): - """maildir_create=True creates cur/new/tmp in an empty directory.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - for subdir in ("cur", "new", "tmp"): - self.assertFalse(os.path.exists(os.path.join(d, subdir))) - - conn = MaildirConnection(d, maildir_create=True) - - for subdir in ("cur", "new", "tmp"): - self.assertTrue(os.path.isdir(os.path.join(d, subdir))) - # Should be able to list messages without error - self.assertEqual(conn.fetch_messages("INBOX"), []) - - def test_create_subdirs_idempotent(self): - """maildir_create=True is safe when subdirs already exist.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - for subdir in ("cur", "new", "tmp"): - os.makedirs(os.path.join(d, subdir)) - - # Should not raise - conn = MaildirConnection(d, maildir_create=True) - self.assertEqual(conn.fetch_messages("INBOX"), []) - - def test_no_create_raises_on_missing_subdirs(self): - """maildir_create=False does not create subdirs; keys() fails.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=False) - - with self.assertRaises(FileNotFoundError): - conn.fetch_messages("INBOX") - - def test_fetch_and_delete_message(self): - """Round-trip: add a message, fetch it, delete it.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - - # Add a message via the underlying client - msg_key = conn._client.add("From: test@example.com\n\nHello") - keys = conn.fetch_messages("INBOX") - self.assertIn(msg_key, keys) - - content = conn.fetch_message(msg_key) - self.assertIn("test@example.com", content) - - conn.delete_message(msg_key) - self.assertEqual(conn.fetch_messages("INBOX"), []) - - def test_move_message_creates_subfolder(self): - """move_message auto-creates the destination subfolder.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - - msg_key = conn._client.add("From: test@example.com\n\nHello") - conn.move_message(msg_key, "archive") - - # Original should be gone - self.assertEqual(conn.fetch_messages("INBOX"), []) - # Archive subfolder should have the message - self.assertIn("archive", conn._subfolder_client) - self.assertEqual(len(conn._subfolder_client["archive"].keys()), 1) - - -class TestMaildirReportsFolder(unittest.TestCase): - """Tests for Maildir reports_folder support in fetch_messages.""" - - def test_fetch_from_subfolder(self): - """fetch_messages with a subfolder name reads from that subfolder.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - - # Add message to a subfolder - subfolder = conn._client.add_folder("reports") - msg_key = subfolder.add("From: test@example.com\n\nSubfolder msg") - - # Root should be empty - self.assertEqual(conn.fetch_messages("INBOX"), []) - - # Subfolder should have the message - keys = conn.fetch_messages("reports") - self.assertIn(msg_key, keys) - - def test_fetch_message_uses_active_folder(self): - """fetch_message reads from the folder set by fetch_messages.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - - subfolder = conn._client.add_folder("reports") - msg_key = subfolder.add("From: sub@example.com\n\nIn subfolder") - - conn.fetch_messages("reports") - content = conn.fetch_message(msg_key) - self.assertIn("sub@example.com", content) - - def test_delete_message_uses_active_folder(self): - """delete_message removes from the folder set by fetch_messages.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - - subfolder = conn._client.add_folder("reports") - msg_key = subfolder.add("From: del@example.com\n\nDelete me") - - conn.fetch_messages("reports") - conn.delete_message(msg_key) - self.assertEqual(conn.fetch_messages("reports"), []) - - def test_move_message_from_subfolder(self): - """move_message works when active folder is a subfolder.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - - subfolder = conn._client.add_folder("reports") - msg_key = subfolder.add("From: move@example.com\n\nMove me") - - conn.fetch_messages("reports") - conn.move_message(msg_key, "archive") - - # Source should be empty - self.assertEqual(conn.fetch_messages("reports"), []) - # Destination should have the message - archive_keys = conn.fetch_messages("archive") - self.assertEqual(len(archive_keys), 1) - - def test_inbox_reads_root(self): - """INBOX reads from the top-level Maildir.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - - msg_key = conn._client.add("From: root@example.com\n\nRoot msg") - - keys = conn.fetch_messages("INBOX") - self.assertIn(msg_key, keys) - - def test_empty_folder_reads_root(self): - """Empty string reports_folder reads from the top-level Maildir.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - - msg_key = conn._client.add("From: root@example.com\n\nRoot msg") - - keys = conn.fetch_messages("") - self.assertIn(msg_key, keys) - - class TestConfigAliases(unittest.TestCase): """Tests for config key aliases (env var friendly short names).""" @@ -2932,63 +1951,6 @@ class TestConfigAliases(unittest.TestCase): ) -class TestMaildirUidHandling(unittest.TestCase): - """Tests for Maildir UID mismatch handling in Docker-like environments.""" - - def test_uid_mismatch_warns_instead_of_crashing(self): - """UID mismatch logs a warning instead of raising an exception.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - # Create subdirs so Maildir works - for subdir in ("cur", "new", "tmp"): - os.makedirs(os.path.join(d, subdir)) - - # Mock os.stat to return a different UID than os.getuid - fake_stat = os.stat(d) - with ( - patch("parsedmarc.mail.maildir.os.stat") as mock_stat, - patch("parsedmarc.mail.maildir.os.getuid", return_value=9999), - ): - mock_stat.return_value = fake_stat - # Should not raise — just warn - conn = MaildirConnection(d, maildir_create=False) - self.assertEqual(conn.fetch_messages("INBOX"), []) - - def test_uid_match_no_warning(self): - """No warning when UIDs match.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - conn = MaildirConnection(d, maildir_create=True) - self.assertEqual(conn.fetch_messages("INBOX"), []) - - def test_stat_failure_does_not_crash(self): - """If os.stat fails on the maildir path, we don't crash.""" - from parsedmarc.mail.maildir import MaildirConnection - - with TemporaryDirectory() as d: - for subdir in ("cur", "new", "tmp"): - os.makedirs(os.path.join(d, subdir)) - - original_stat = os.stat - - def stat_that_fails_once(path, *args, **kwargs): - """Fail on the first call (UID check), pass through after.""" - stat_that_fails_once.calls += 1 - if stat_that_fails_once.calls == 1: - raise OSError("no stat") - return original_stat(path, *args, **kwargs) - - stat_that_fails_once.calls = 0 - - with patch( - "parsedmarc.mail.maildir.os.stat", side_effect=stat_that_fails_once - ): - conn = MaildirConnection(d, maildir_create=False) - self.assertEqual(conn.fetch_messages("INBOX"), []) - - class TestExpandPath(unittest.TestCase): """Tests for _expand_path config path expansion.""" @@ -3021,39 +1983,6 @@ class TestExpandPath(unittest.TestCase): self.assertEqual(_expand_path("relative/path"), "relative/path") -class TestTokenParentDirCreation(unittest.TestCase): - """Tests for parent directory creation when writing token files.""" - - def test_graph_cache_creates_parent_dirs(self): - from parsedmarc.mail.graph import _cache_auth_record - - with TemporaryDirectory() as d: - token_path = Path(d) / "subdir" / "nested" / ".token" - self.assertFalse(token_path.parent.exists()) - - mock_record = MagicMock() - mock_record.serialize.return_value = "serialized-token" - - _cache_auth_record(mock_record, token_path) - - self.assertTrue(token_path.exists()) - self.assertEqual(token_path.read_text(), "serialized-token") - - def test_gmail_token_write_creates_parent_dirs(self): - """Gmail token write creates parent directories.""" - with TemporaryDirectory() as d: - token_path = Path(d) / "deep" / "nested" / "token.json" - self.assertFalse(token_path.parent.exists()) - - # Directly test the mkdir + open pattern - token_path.parent.mkdir(parents=True, exist_ok=True) - with token_path.open("w") as f: - f.write('{"token": "test"}') - - self.assertTrue(token_path.exists()) - self.assertEqual(token_path.read_text(), '{"token": "test"}') - - class TestEnvVarConfig(unittest.TestCase): """Tests for environment variable configuration support."""