test(mail): convert test_preprocessor.py to pytest style

This commit is contained in:
Trenton H
2026-04-30 08:56:26 -07:00
committed by stumpylog
parent ff7be6371b
commit 05b9ac0ddb
+140 -112
View File
@@ -3,24 +3,28 @@ import email.contentmanager
import shutil
import subprocess
import tempfile
from collections.abc import Generator
from email.message import Message
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from unittest import mock
import gnupg
from django.test import override_settings
import pytest
from imap_tools import MailMessage
from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailRule
from paperless_mail.preprocessor import MailMessageDecryptor
from paperless_mail.tests.factories import MailAccountFactory
from paperless_mail.tests.test_mail import TestMail
from paperless_mail.tests.test_mail import _AttachmentDef
class MessageEncryptor:
"""
Test helper: generates a throwaway GPG keypair in a tempdir and exposes
`encrypt(MailMessage) -> MailMessage` plus `cleanup()`.
"""
def __init__(self) -> None:
self.gpg_home = tempfile.mkdtemp()
self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
@@ -74,7 +78,7 @@ class MessageEncryptor:
]
return message_copy.as_bytes()
def encrypt(self, message):
def encrypt(self, message) -> MailMessage:
original_email: email.message.Message = message.obj
encrypted_data = self.gpg.encrypt(
self.get_email_body_without_headers(original_email),
@@ -104,87 +108,126 @@ class MessageEncryptor:
)
new_email.attach(encrypted_part)
encrypted_message: MailMessage = MailMessage(
return MailMessage(
[(f"UID {message.uid}".encode(), new_email.as_bytes())],
)
return encrypted_message
class TestMailMessageGpgDecryptor(TestMail):
@classmethod
def setUpClass(cls) -> None:
"""Create GPG encryptor once for all tests in this class."""
super().setUpClass()
cls.messageEncryptor = MessageEncryptor()
@pytest.fixture(scope="session")
def message_encryptor() -> Generator[MessageEncryptor, None, None]:
"""
Session-scoped: GPG keypair generation is slow (~1s+), and nothing in
these tests mutates the keyring after creation.
"""
encryptor = MessageEncryptor()
yield encryptor
encryptor.cleanup()
@classmethod
def tearDownClass(cls) -> None:
"""Clean up GPG resources after all tests complete."""
if hasattr(cls, "messageEncryptor"):
cls.messageEncryptor.cleanup()
super().tearDownClass()
def setUp(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
super().setUp()
@pytest.fixture()
def gpg_settings(settings, message_encryptor: MessageEncryptor):
settings.EMAIL_GNUPG_HOME = message_encryptor.gpg_home
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
return settings
def test_preprocessor_is_able_to_run(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
def test_preprocessor_is_able_to_run2(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=None,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
@pytest.fixture()
def encrypted_pair(mail_mocker, message_encryptor: MessageEncryptor):
"""
Build a (encrypted, plaintext) MailMessage pair sharing the same UID and
headers, with two PDF attachments on the plaintext side.
"""
plaintext = mail_mocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(filename="f1.pdf", disposition="inline"),
_AttachmentDef(filename="f2.pdf"),
],
)
encrypted = message_encryptor.encrypt(plaintext)
return encrypted, plaintext
def test_is_not_able_to_run_disabled(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=False,
):
self.assertFalse(MailMessageDecryptor.able_to_run())
def test_is_not_able_to_run_bogus_path(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME="_)@# notapath &%#$",
):
self.assertFalse(MailMessageDecryptor.able_to_run())
class TestMailMessageDecryptorAbleToRun:
"""`MailMessageDecryptor.able_to_run()` configuration matrix — no DB needed."""
def test_fails_at_initialization(self) -> None:
with (
mock.patch("gnupg.GPG.__init__") as mock_run,
override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
@pytest.mark.parametrize(
("settings_overrides", "expected"),
[
pytest.param(
{
"EMAIL_GNUPG_HOME": "_gpg_home_marker",
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
},
True,
id="enabled-with-valid-home",
),
):
pytest.param(
{"EMAIL_GNUPG_HOME": None, "EMAIL_ENABLE_GPG_DECRYPTOR": True},
True,
id="enabled-with-default-home",
),
pytest.param(
{"EMAIL_ENABLE_GPG_DECRYPTOR": False},
False,
id="disabled",
),
pytest.param(
{
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
"EMAIL_GNUPG_HOME": "_)@# notapath &%#$",
},
False,
id="enabled-with-bogus-path",
),
],
)
def test_able_to_run(
self,
settings,
message_encryptor: MessageEncryptor,
settings_overrides: dict,
*,
expected: bool,
) -> None:
for key, value in settings_overrides.items():
if value == "_gpg_home_marker":
value = message_encryptor.gpg_home
setattr(settings, key, value)
assert MailMessageDecryptor.able_to_run() is expected
def side_effect(*args, **kwargs):
raise OSError("Cannot find 'gpg' binary")
mock_run.side_effect = side_effect
@pytest.mark.django_db
class TestMailMessageDecryptor:
"""End-to-end decrypt and consumption flow with a real GPG keyring."""
handler = MailAccountHandler()
self.assertEqual(len(handler._message_preprocessors), 0)
def test_fails_at_initialization(self, settings, mocker) -> None:
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
mocker.patch(
"gnupg.GPG.__init__",
side_effect=OSError("Cannot find 'gpg' binary"),
)
def test_decrypt_fails(self) -> None:
encrypted_message, _ = self.create_encrypted_unencrypted_message_pair()
handler = MailAccountHandler()
assert len(handler._message_preprocessors) == 0
def test_decrypt_fails(self, settings, encrypted_pair) -> None:
"""
A decryptor pointed at a fresh empty GPG home cannot decrypt the
message — ensure it surfaces an exception rather than silently passing
bytes through.
"""
encrypted_message, _ = encrypted_pair
# This test creates its own empty GPG home to test decryption failure
empty_gpg_home = tempfile.mkdtemp()
try:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME=empty_gpg_home,
):
message_decryptor = MailMessageDecryptor()
self.assertRaises(Exception, message_decryptor.run, encrypted_message)
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
settings.EMAIL_GNUPG_HOME = empty_gpg_home
decryptor = MailMessageDecryptor()
with pytest.raises(Exception):
decryptor.run(encrypted_message)
finally:
# Clean up the temporary GPG home used only by this test
try:
@@ -199,57 +242,43 @@ class TestMailMessageGpgDecryptor(TestMail):
pass
shutil.rmtree(empty_gpg_home, ignore_errors=True)
def test_decrypt_encrypted_mail(self) -> None:
def test_decrypt_encrypted_mail(self, gpg_settings, encrypted_pair) -> None:
"""
Creates a mail with attachments. Then encrypts it with a new key.
Verifies that this encrypted message can be decrypted with attachments intact.
"""
encrypted_message, message = self.create_encrypted_unencrypted_message_pair()
headers = message.headers
text = message.text
encrypted_message, plaintext = encrypted_pair
headers = plaintext.headers
text = plaintext.text
self.assertEqual(len(encrypted_message.attachments), 1)
self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc")
self.assertEqual(encrypted_message.text, "")
assert len(encrypted_message.attachments) == 1
assert encrypted_message.attachments[0].filename == "encrypted.asc"
assert encrypted_message.text == ""
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
):
message_decryptor = MailMessageDecryptor()
self.assertTrue(message_decryptor.able_to_run())
decrypted_message = message_decryptor.run(encrypted_message)
decryptor = MailMessageDecryptor()
assert decryptor.able_to_run()
decrypted = decryptor.run(encrypted_message)
self.assertEqual(len(decrypted_message.attachments), 2)
self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf")
self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf")
self.assertEqual(decrypted_message.headers, headers)
self.assertEqual(decrypted_message.text, text)
self.assertEqual(decrypted_message.uid, message.uid)
assert len(decrypted.attachments) == 2
assert decrypted.attachments[0].filename == "f1.pdf"
assert decrypted.attachments[1].filename == "f2.pdf"
assert decrypted.headers == headers
assert decrypted.text == text
assert decrypted.uid == plaintext.uid
def create_encrypted_unencrypted_message_pair(self):
message = self.mailMocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(
filename="f1.pdf",
disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
encrypted_message = self.messageEncryptor.encrypt(message)
return encrypted_message, message
def test_handle_encrypted_message(self) -> None:
message = self.mailMocker.messageBuilder.create_message(
def test_handle_encrypted_message(
self,
gpg_settings,
mail_mocker,
message_encryptor: MessageEncryptor,
) -> None:
plaintext = mail_mocker.messageBuilder.create_message(
subject="the message title",
from_="Myself",
attachments=2,
body="Test mail",
)
encrypted_message = self.messageEncryptor.encrypt(message)
encrypted = message_encryptor.encrypt(plaintext)
account = MailAccountFactory()
rule = MailRule(
@@ -259,18 +288,17 @@ class TestMailMessageGpgDecryptor(TestMail):
)
rule.save()
result = self.mail_account_handler._handle_message(encrypted_message, rule)
handler = MailAccountHandler()
result = handler._handle_message(encrypted, rule)
self.assertEqual(result, 3)
self.mailMocker._queue_consumption_tasks_mock.assert_called()
self.mailMocker.assert_queue_consumption_tasks_call_args(
assert result == 3
mail_mocker._queue_consumption_tasks_mock.assert_called()
mail_mocker.assert_queue_consumption_tasks_call_args(
[
[
{
"override_title": message.subject,
"override_filename": f"{message.subject}.eml",
"override_title": plaintext.subject,
"override_filename": f"{plaintext.subject}.eml",
},
],
[