Offload mailbox layer to mailsuite>=2.0.0 (#741)

mailsuite 2.0.0 extracted the IMAP, Microsoft Graph, Gmail, and Maildir
connections out of parsedmarc into mailsuite.mailbox so other projects
can reuse the same provider-agnostic interface. Replace the
parsedmarc/mail submodules with a thin re-export of mailsuite.mailbox
and drop the duplicated implementations.

Per the migration note in seanthegeek/mailsuite#22, pass
token_cache_name="parsedmarc" so existing AuthenticationRecord caches
on disk continue to work without re-prompting users to authenticate.
The existing graph_url config knob is forwarded unchanged.

Drop direct dependencies that are now installed transitively via
mailsuite[gmail,msgraph] (msgraph-core, imapclient, google-*). The
extras are pulled in non-optionally so Gmail and Microsoft Graph
support remain available out of the box.

Drop nine test classes that were exercising mailsuite-side
implementation internals (TestGmailConnection, TestGraphConnection,
TestImapConnection, the _get_creds/_generate_credential half of
TestGmailAuthModes, TestImapFallbacks, TestMSGraphFolderFallback,
TestMaildirConnection, TestMaildirReportsFolder, TestMaildirUidHandling,
TestTokenParentDirCreation); these are mailsuite's tests now. The CLI
integration tests that mock parsedmarc.cli.{IMAP,Gmail,MSGraph}Connection
are kept.

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sean Whalen
2026-04-28 00:58:36 -04:00
committed by GitHub
parent 900ee22525
commit 5d816a4e56
11 changed files with 33 additions and 1878 deletions
+8
View File
@@ -1,5 +1,13 @@
# Changelog
## 9.11.0
### Changes
- **Mailbox backends now live in `mailsuite>=2.0.0`.** The `IMAPConnection`, `MSGraphConnection`, `GmailConnection`, `MaildirConnection`, and `MailboxConnection` implementations were extracted into [mailsuite 2.0.0](https://github.com/seanthegeek/mailsuite/releases/tag/2.0.0) so other projects can reuse the same provider-agnostic interface. parsedmarc's `parsedmarc.mail` package is now a thin re-export of `mailsuite.mailbox`; existing imports (`from parsedmarc.mail import IMAPConnection`, etc.) continue to work unchanged. The CLI passes `token_cache_name="parsedmarc"` and forwards the existing `[msgraph] graph_url` config knob so cached `AuthenticationRecord`s and tokens carry over without re-prompting.
- **MSGraph backend rewritten on `msgraph-sdk` (kiota-based).** mailsuite 2.0 replaces the retired `msgraph-core==0.2.2` REST wrapper with the supported `msgraph-sdk` client. End-user behavior is unchanged.
- **Direct dependencies on `msgraph-core`, `imapclient`, `google-api-core`, `google-api-python-client`, `google-auth-httplib2`, `google-auth-oauthlib`, and `google-auth` removed.** They are now installed transitively via `mailsuite[gmail,msgraph]>=2.0.0`, which is included as a non-optional dependency so Gmail and Microsoft Graph support remain available out of the box.
## 9.10.3
### Fixed
+2 -1
View File
@@ -43,12 +43,12 @@ from parsedmarc import (
)
from parsedmarc.log import logger
from parsedmarc.mail import (
AuthMethod,
GmailConnection,
IMAPConnection,
MaildirConnection,
MSGraphConnection,
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import (
InvalidIPinfoAPIKey,
@@ -2150,6 +2150,7 @@ def _main():
token_file=opts.graph_token_file,
allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage),
graph_url=opts.graph_url,
token_cache_name="parsedmarc",
)
except Exception:
+1 -1
View File
@@ -1,4 +1,4 @@
__version__ = "9.10.3"
__version__ = "9.11.0"
USER_AGENT = f"parsedmarc/{__version__}"
+20 -7
View File
@@ -1,13 +1,26 @@
from parsedmarc.mail.mailbox_connection import MailboxConnection
from parsedmarc.mail.graph import MSGraphConnection
from parsedmarc.mail.gmail import GmailConnection
from parsedmarc.mail.imap import IMAPConnection
from parsedmarc.mail.maildir import MaildirConnection
# -*- coding: utf-8 -*-
"""Mailbox connections for parsedmarc.
The implementations live in :mod:`mailsuite.mailbox` (extracted from
parsedmarc in mailsuite 2.0.0). This module re-exports them so
``parsedmarc.mail`` remains a stable import path for downstream consumers.
"""
from mailsuite.mailbox import (
GmailConnection,
IMAPConnection,
MailboxConnection,
MaildirConnection,
MSGraphConnection,
)
from mailsuite.mailbox.graph import AuthMethod
__all__ = [
"MailboxConnection",
"MSGraphConnection",
"AuthMethod",
"GmailConnection",
"IMAPConnection",
"MailboxConnection",
"MaildirConnection",
"MSGraphConnection",
]
-196
View File
@@ -1,196 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from base64 import urlsafe_b64decode
from functools import lru_cache
from pathlib import Path
from time import sleep
from typing import List
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
def _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode="installed_app",
service_account_user=None,
):
normalized_auth_mode = (auth_mode or "installed_app").strip().lower()
if normalized_auth_mode == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_file,
scopes=scopes,
)
if service_account_user:
creds = creds.with_subject(service_account_user)
return creds
if normalized_auth_mode != "installed_app":
raise ValueError(
f"Unsupported Gmail auth_mode '{auth_mode}'. "
"Expected 'installed_app' or 'service_account'."
)
creds = None
if Path(token_file).exists():
creds = Credentials.from_authorized_user_file(token_file, scopes)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes)
creds = flow.run_local_server(open_browser=False, oauth2_port=oauth2_port)
# Save the credentials for the next run
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
with Path(token_file).open("w") as token:
token.write(creds.to_json())
return creds
class GmailConnection(MailboxConnection):
def __init__(
self,
token_file: str,
credentials_file: str,
scopes: List[str],
include_spam_trash: bool,
reports_folder: str,
oauth2_port: int,
paginate_messages: bool,
auth_mode: str = "installed_app",
service_account_user: str | None = None,
):
creds = _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode=auth_mode,
service_account_user=service_account_user,
)
self.service = build("gmail", "v1", credentials=creds)
self.include_spam_trash = include_spam_trash
self.reports_label_id = self._find_label_id_for_label(reports_folder)
self.paginate_messages = paginate_messages
def create_folder(self, folder_name: str):
# Gmail doesn't support the name Archive
if folder_name == "Archive":
return
logger.debug(f"Creating label {folder_name}")
request_body = {"name": folder_name, "messageListVisibility": "show"}
try:
self.service.users().labels().create(
userId="me", body=request_body
).execute()
except HttpError as e:
if e.status_code == 409:
logger.debug(f"Folder {folder_name} already exists, skipping creation")
else:
raise e
def _fetch_all_message_ids(self, reports_label_id, page_token=None, since=None):
if since:
results = (
self.service.users()
.messages()
.list(
userId="me",
includeSpamTrash=self.include_spam_trash,
labelIds=[reports_label_id],
pageToken=page_token,
q=f"after:{since}",
)
.execute()
)
else:
results = (
self.service.users()
.messages()
.list(
userId="me",
includeSpamTrash=self.include_spam_trash,
labelIds=[reports_label_id],
pageToken=page_token,
)
.execute()
)
messages = results.get("messages", [])
for message in messages:
yield message["id"]
if "nextPageToken" in results and self.paginate_messages:
yield from self._fetch_all_message_ids(
reports_label_id, results["nextPageToken"]
)
def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
reports_label_id = self._find_label_id_for_label(reports_folder)
since = kwargs.get("since")
if since:
return [
id for id in self._fetch_all_message_ids(reports_label_id, since=since)
]
else:
return [id for id in self._fetch_all_message_ids(reports_label_id)]
def fetch_message(self, message_id) -> str:
msg = (
self.service.users()
.messages()
.get(userId="me", id=message_id, format="raw")
.execute()
)
return urlsafe_b64decode(msg["raw"]).decode(errors="replace")
def delete_message(self, message_id: str):
self.service.users().messages().delete(userId="me", id=message_id).execute()
def move_message(self, message_id: str, folder_name: str):
label_id = self._find_label_id_for_label(folder_name)
logger.debug(f"Moving message UID {message_id} to {folder_name}")
request_body = {
"addLabelIds": [label_id],
"removeLabelIds": [self.reports_label_id],
}
self.service.users().messages().modify(
userId="me", id=message_id, body=request_body
).execute()
def keepalive(self):
# Not needed
pass
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)
def _find_label_id_for_label(self, label_name: str) -> str:
results = self.service.users().labels().list(userId="me").execute()
labels = results.get("labels", [])
for label in labels:
if label_name == label["id"] or label_name == label["name"]:
return label["id"]
return ""
-339
View File
@@ -1,339 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from enum import Enum
from functools import lru_cache
from pathlib import Path
from time import sleep
from typing import Any, List, Optional, Union
from azure.identity import (
UsernamePasswordCredential,
DeviceCodeCredential,
ClientSecretCredential,
CertificateCredential,
TokenCachePersistenceOptions,
AuthenticationRecord,
)
from msgraph.core import GraphClient
from requests.exceptions import RequestException
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
GRAPH_REQUEST_RETRY_ATTEMPTS = 3
GRAPH_REQUEST_RETRY_DELAY_SECONDS = 5
class AuthMethod(Enum):
DeviceCode = 1
UsernamePassword = 2
ClientSecret = 3
Certificate = 4
def _get_cache_args(token_path: Path, allow_unencrypted_storage):
cache_args: dict[str, Any] = {
"cache_persistence_options": TokenCachePersistenceOptions(
name="parsedmarc", allow_unencrypted_storage=allow_unencrypted_storage
)
}
auth_record = _load_token(token_path)
if auth_record:
cache_args["authentication_record"] = AuthenticationRecord.deserialize(
auth_record
)
return cache_args
def _load_token(token_path: Path) -> Optional[str]:
if not token_path.exists():
return None
with token_path.open() as token_file:
return token_file.read()
def _cache_auth_record(record: AuthenticationRecord, token_path: Path):
token = record.serialize()
token_path.parent.mkdir(parents=True, exist_ok=True)
with token_path.open("w") as token_file:
token_file.write(token)
def _generate_credential(auth_method: str, token_path: Path, **kwargs):
if auth_method == AuthMethod.DeviceCode.name:
credential = DeviceCodeCredential(
client_id=kwargs["client_id"],
disable_automatic_authentication=True,
tenant_id=kwargs["tenant_id"],
**_get_cache_args(
token_path,
allow_unencrypted_storage=kwargs["allow_unencrypted_storage"],
),
)
elif auth_method == AuthMethod.UsernamePassword.name:
credential = UsernamePasswordCredential(
client_id=kwargs["client_id"],
client_credential=kwargs["client_secret"],
disable_automatic_authentication=True,
username=kwargs["username"],
password=kwargs["password"],
**_get_cache_args(
token_path,
allow_unencrypted_storage=kwargs["allow_unencrypted_storage"],
),
)
elif auth_method == AuthMethod.ClientSecret.name:
credential = ClientSecretCredential(
client_id=kwargs["client_id"],
tenant_id=kwargs["tenant_id"],
client_secret=kwargs["client_secret"],
)
elif auth_method == AuthMethod.Certificate.name:
cert_path = kwargs.get("certificate_path")
if not cert_path:
raise ValueError(
"certificate_path is required when auth_method is 'Certificate'"
)
credential = CertificateCredential(
client_id=kwargs["client_id"],
tenant_id=kwargs["tenant_id"],
certificate_path=cert_path,
password=kwargs.get("certificate_password"),
)
else:
raise RuntimeError(f"Auth method {auth_method} not found")
return credential
class MSGraphConnection(MailboxConnection):
_WELL_KNOWN_FOLDERS = {
"inbox": "inbox",
"archive": "archive",
"drafts": "drafts",
"sentitems": "sentitems",
"deleteditems": "deleteditems",
"junkemail": "junkemail",
}
def __init__(
self,
auth_method: str,
mailbox: str,
graph_url: str,
client_id: str,
client_secret: Optional[str],
username: Optional[str],
password: Optional[str],
tenant_id: str,
token_file: str,
allow_unencrypted_storage: bool,
certificate_path: Optional[str] = None,
certificate_password: Optional[Union[str, bytes]] = None,
):
token_path = Path(token_file)
credential = _generate_credential(
auth_method,
client_id=client_id,
client_secret=client_secret,
certificate_path=certificate_path,
certificate_password=certificate_password,
username=username,
password=password,
tenant_id=tenant_id,
token_path=token_path,
allow_unencrypted_storage=allow_unencrypted_storage,
)
client_params = {
"credential": credential,
"cloud": graph_url,
}
if not isinstance(credential, (ClientSecretCredential, CertificateCredential)):
scopes = ["Mail.ReadWrite"]
# Detect if mailbox is shared
if mailbox and username and username != mailbox:
scopes = ["Mail.ReadWrite.Shared"]
auth_record = credential.authenticate(scopes=scopes)
_cache_auth_record(auth_record, token_path)
client_params["scopes"] = scopes
self._client = GraphClient(**client_params)
self.mailbox_name = mailbox
def _request_with_retries(self, method_name: str, *args, **kwargs):
for attempt in range(1, GRAPH_REQUEST_RETRY_ATTEMPTS + 1):
try:
return getattr(self._client, method_name)(*args, **kwargs)
except RequestException as error:
if attempt == GRAPH_REQUEST_RETRY_ATTEMPTS:
raise
logger.warning(
"Transient MS Graph %s error on attempt %s/%s: %s",
method_name.upper(),
attempt,
GRAPH_REQUEST_RETRY_ATTEMPTS,
error,
)
sleep(GRAPH_REQUEST_RETRY_DELAY_SECONDS)
raise RuntimeError("no retry attempts configured")
def create_folder(self, folder_name: str):
sub_url = ""
path_parts = folder_name.split("/")
if len(path_parts) > 1: # Folder is a subFolder
parent_folder_id = None
for folder in path_parts[:-1]:
parent_folder_id = self._find_folder_id_with_parent(
folder, parent_folder_id
)
sub_url = f"/{parent_folder_id}/childFolders"
folder_name = path_parts[-1]
request_body = {"displayName": folder_name}
request_url = f"/users/{self.mailbox_name}/mailFolders{sub_url}"
resp = self._request_with_retries("post", request_url, json=request_body)
if resp.status_code == 409:
logger.debug(f"Folder {folder_name} already exists, skipping creation")
elif resp.status_code == 201:
logger.debug(f"Created folder {folder_name}")
else:
logger.warning(f"Unknown response {resp.status_code} {resp.json()}")
def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
"""Returns a list of message UIDs in the specified folder"""
folder_id = self._find_folder_id_from_folder_path(reports_folder)
url = f"/users/{self.mailbox_name}/mailFolders/{folder_id}/messages"
since = kwargs.get("since")
if not since:
since = None
batch_size = kwargs.get("batch_size")
if not batch_size:
batch_size = 0
emails = self._get_all_messages(url, batch_size, since)
return [email["id"] for email in emails]
def _get_all_messages(self, url, batch_size, since):
messages: list
params: dict[str, Union[str, int]] = {"$select": "id"}
if since:
params["$filter"] = f"receivedDateTime ge {since}"
if batch_size and batch_size > 0:
params["$top"] = batch_size
else:
params["$top"] = 100
result = self._request_with_retries("get", url, params=params)
if result.status_code != 200:
raise RuntimeError(f"Failed to fetch messages {result.text}")
messages = result.json()["value"]
# Loop if next page is present and not obtained message limit.
while "@odata.nextLink" in result.json() and (
since is not None or (batch_size == 0 or batch_size - len(messages) > 0)
):
result = self._request_with_retries("get", result.json()["@odata.nextLink"])
if result.status_code != 200:
raise RuntimeError(f"Failed to fetch messages {result.text}")
messages.extend(result.json()["value"])
return messages
def mark_message_read(self, message_id: str):
"""Marks a message as read"""
url = f"/users/{self.mailbox_name}/messages/{message_id}"
resp = self._request_with_retries("patch", url, json={"isRead": "true"})
if resp.status_code != 200:
raise RuntimeWarning(
f"Failed to mark message read{resp.status_code}: {resp.json()}"
)
def fetch_message(self, message_id: str, **kwargs):
url = f"/users/{self.mailbox_name}/messages/{message_id}/$value"
result = self._request_with_retries("get", url)
if result.status_code != 200:
raise RuntimeWarning(
f"Failed to fetch message{result.status_code}: {result.json()}"
)
mark_read = kwargs.get("mark_read")
if mark_read:
self.mark_message_read(message_id)
return result.text
def delete_message(self, message_id: str):
url = f"/users/{self.mailbox_name}/messages/{message_id}"
resp = self._request_with_retries("delete", url)
if resp.status_code != 204:
raise RuntimeWarning(
f"Failed to delete message {resp.status_code}: {resp.json()}"
)
def move_message(self, message_id: str, folder_name: str):
folder_id = self._find_folder_id_from_folder_path(folder_name)
request_body = {"destinationId": folder_id}
url = f"/users/{self.mailbox_name}/messages/{message_id}/move"
resp = self._request_with_retries("post", url, json=request_body)
if resp.status_code != 201:
raise RuntimeWarning(
f"Failed to move message {resp.status_code}: {resp.json()}"
)
def keepalive(self):
# Not needed
pass
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)
def _find_folder_id_from_folder_path(self, folder_name: str) -> str:
path_parts = folder_name.split("/")
parent_folder_id = None
if len(path_parts) > 1:
for folder in path_parts[:-1]:
folder_id = self._find_folder_id_with_parent(folder, parent_folder_id)
parent_folder_id = folder_id
return self._find_folder_id_with_parent(path_parts[-1], parent_folder_id)
else:
return self._find_folder_id_with_parent(folder_name, None)
def _get_well_known_folder_id(self, folder_name: str) -> Optional[str]:
folder_key = folder_name.lower().replace(" ", "").replace("-", "")
alias = self._WELL_KNOWN_FOLDERS.get(folder_key)
if alias is None:
return None
url = f"/users/{self.mailbox_name}/mailFolders/{alias}?$select=id,displayName"
folder_resp = self._request_with_retries("get", url)
if folder_resp.status_code != 200:
return None
payload = folder_resp.json()
return payload.get("id")
def _find_folder_id_with_parent(
self, folder_name: str, parent_folder_id: Optional[str]
):
sub_url = ""
if parent_folder_id is not None:
sub_url = f"/{parent_folder_id}/childFolders"
url = f"/users/{self.mailbox_name}/mailFolders{sub_url}"
filter = f"?$filter=displayName eq '{folder_name}'"
folders_resp = self._request_with_retries("get", url + filter)
if folders_resp.status_code != 200:
if parent_folder_id is None:
well_known_folder_id = self._get_well_known_folder_id(folder_name)
if well_known_folder_id:
return well_known_folder_id
raise RuntimeWarning(f"Failed to list folders.{folders_resp.json()}")
folders: list = folders_resp.json()["value"]
matched_folders = [
folder for folder in folders if folder["displayName"] == folder_name
]
if len(matched_folders) == 0:
raise RuntimeError(f"folder {folder_name} not found")
selected_folder = matched_folders[0]
return selected_folder["id"]
-117
View File
@@ -1,117 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import cast
from time import sleep
from imapclient.exceptions import IMAPClientError
from mailsuite.imap import IMAPClient
from socket import timeout
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
class IMAPConnection(MailboxConnection):
def __init__(
self,
host: str,
user: str,
password: str,
port: int = 993,
ssl: bool = True,
verify: bool = True,
timeout: int = 30,
max_retries: int = 4,
):
self._username = user
self._password = password
self._verify = verify
self._client = IMAPClient(
host,
user,
password,
port=port,
ssl=ssl,
verify=verify,
timeout=timeout,
max_retries=max_retries,
)
def create_folder(self, folder_name: str):
self._client.create_folder(folder_name)
def fetch_messages(self, reports_folder: str, **kwargs):
self._client.select_folder(reports_folder)
since = kwargs.get("since")
if since is not None:
return self._client.search(f"SINCE {since}")
else:
return self._client.search()
def fetch_message(self, message_id: int):
return cast(str, self._client.fetch_message(message_id, parse=False))
def delete_message(self, message_id: int):
try:
self._client.delete_messages([message_id])
except IMAPClientError as error:
logger.warning(
"IMAP delete fallback for message %s due to server error: %s",
message_id,
error,
)
self._client.add_flags([message_id], [r"\Deleted"], silent=True)
self._client.expunge()
def move_message(self, message_id: int, folder_name: str):
try:
self._client.move_messages([message_id], folder_name)
except IMAPClientError as error:
logger.warning(
"IMAP move fallback for message %s due to server error: %s",
message_id,
error,
)
self._client.copy([message_id], folder_name)
self.delete_message(message_id)
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout, config_reloading=None):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
"""
# IDLE callback sends IMAPClient object,
# send back the imap connection object instead
def idle_callback_wrapper(client: IMAPClient):
self._client = client
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
username=self._username,
password=self._password,
port=self._client.port,
ssl=self._client.ssl,
verify=self._verify,
idle_callback=idle_callback_wrapper,
idle_timeout=check_timeout,
)
except (timeout, IMAPClientError):
logger.warning("IMAP connection timeout. Reconnecting...")
sleep(check_timeout)
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if config_reloading and config_reloading():
return
-32
View File
@@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import ABC
class MailboxConnection(ABC):
"""
Interface for a mailbox connection
"""
def create_folder(self, folder_name: str):
raise NotImplementedError
def fetch_messages(self, reports_folder: str, **kwargs):
raise NotImplementedError
def fetch_message(self, message_id) -> str:
raise NotImplementedError
def delete_message(self, message_id):
raise NotImplementedError
def move_message(self, message_id, folder_name: str):
raise NotImplementedError
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout, config_reloading=None):
raise NotImplementedError
-105
View File
@@ -1,105 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import mailbox
import os
from time import sleep
from typing import Dict
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
class MaildirConnection(MailboxConnection):
def __init__(
self,
maildir_path: str,
maildir_create: bool = False,
):
self._maildir_path = maildir_path
self._maildir_create = maildir_create
try:
maildir_owner = os.stat(maildir_path).st_uid
except OSError:
maildir_owner = None
current_uid = os.getuid()
if maildir_owner is not None and current_uid != maildir_owner:
if current_uid == 0:
try:
logger.warning(
"Switching uid to {} to access Maildir".format(maildir_owner)
)
os.setuid(maildir_owner)
except OSError as e:
logger.warning(
"Failed to switch uid to {}: {}".format(maildir_owner, e)
)
else:
logger.warning(
"Runtime uid {} differs from maildir {} owner {}. "
"Access may fail if permissions are insufficient.".format(
current_uid, maildir_path, maildir_owner
)
)
if maildir_create:
for subdir in ("cur", "new", "tmp"):
os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True)
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
self._active_folder: mailbox.Maildir = self._client
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
def _get_folder(self, folder_name: str) -> mailbox.Maildir:
"""Return a cached subfolder handle, creating it if needed."""
if folder_name not in self._subfolder_client:
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
return self._subfolder_client[folder_name]
def create_folder(self, folder_name: str):
self._get_folder(folder_name)
def fetch_messages(self, reports_folder: str, **kwargs):
if reports_folder and reports_folder != "INBOX":
self._active_folder = self._get_folder(reports_folder)
else:
self._active_folder = self._client
return self._active_folder.keys()
def fetch_message(self, message_id: str, **kwargs) -> str:
msg = self._active_folder.get(message_id)
if msg is None:
return ""
msg_str = msg.as_string()
if kwargs.get("mark_read"):
# Maildir spec: a message is "read" once it has been moved out of
# new/ into cur/ with the "S" (Seen) flag set in its info field.
msg.set_subdir("cur")
msg.add_flag("S")
self._active_folder[message_id] = msg
return msg_str or ""
def delete_message(self, message_id: str):
self._active_folder.remove(message_id)
def move_message(self, message_id: str, folder_name: str):
message_data = self._active_folder.get(message_id)
if message_data is None:
return
dest = self._get_folder(folder_name)
dest.add(message_data)
self._active_folder.remove(message_id)
def keepalive(self):
return
def watch(self, check_callback, check_timeout, config_reloading=None):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if config_reloading and config_reloading():
return
sleep(check_timeout)
+1 -8
View File
@@ -39,17 +39,10 @@ dependencies = [
"elasticsearch-dsl==7.4.0",
"elasticsearch<7.14.0",
"expiringdict>=1.1.4",
"google-api-core>=2.4.0",
"google-api-python-client>=2.35.0",
"google-auth-httplib2>=0.1.0",
"google-auth-oauthlib>=0.4.6",
"google-auth>=2.3.3",
"imapclient>=3.1.0",
"kafka-python-ng>=2.2.2",
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"mailsuite[gmail,msgraph]>=2.0.0",
"maxminddb>=2.0.0",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=4.0.0",
"publicsuffixlist>=0.10.0",
"pygelf>=0.4.2",
+1 -1072
View File
File diff suppressed because it is too large Load Diff