test(mail): convert test_preprocessor.py to pytest style

This commit is contained in:
Trenton H
2026-04-30 08:56:26 -07:00
parent e40cd6048d
commit 4872bfbeae
+140 -112
View File
@@ -3,24 +3,28 @@ import email.contentmanager
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
from collections.abc import Generator
from email.message import Message from email.message import Message
from email.mime.application import MIMEApplication from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from unittest import mock
import gnupg import gnupg
from django.test import override_settings import pytest
from imap_tools import MailMessage from imap_tools import MailMessage
from paperless_mail.mail import MailAccountHandler from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailRule from paperless_mail.models import MailRule
from paperless_mail.preprocessor import MailMessageDecryptor from paperless_mail.preprocessor import MailMessageDecryptor
from paperless_mail.tests.factories import MailAccountFactory from paperless_mail.tests.factories import MailAccountFactory
from paperless_mail.tests.test_mail import TestMail
from paperless_mail.tests.test_mail import _AttachmentDef from paperless_mail.tests.test_mail import _AttachmentDef
class MessageEncryptor: class MessageEncryptor:
"""
Test helper: generates a throwaway GPG keypair in a tempdir and exposes
`encrypt(MailMessage) -> MailMessage` plus `cleanup()`.
"""
def __init__(self) -> None: def __init__(self) -> None:
self.gpg_home = tempfile.mkdtemp() self.gpg_home = tempfile.mkdtemp()
self.gpg = gnupg.GPG(gnupghome=self.gpg_home) self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
@@ -74,7 +78,7 @@ class MessageEncryptor:
] ]
return message_copy.as_bytes() return message_copy.as_bytes()
def encrypt(self, message): def encrypt(self, message) -> MailMessage:
original_email: email.message.Message = message.obj original_email: email.message.Message = message.obj
encrypted_data = self.gpg.encrypt( encrypted_data = self.gpg.encrypt(
self.get_email_body_without_headers(original_email), self.get_email_body_without_headers(original_email),
@@ -104,87 +108,126 @@ class MessageEncryptor:
) )
new_email.attach(encrypted_part) new_email.attach(encrypted_part)
encrypted_message: MailMessage = MailMessage( return MailMessage(
[(f"UID {message.uid}".encode(), new_email.as_bytes())], [(f"UID {message.uid}".encode(), new_email.as_bytes())],
) )
return encrypted_message
class TestMailMessageGpgDecryptor(TestMail): @pytest.fixture(scope="session")
@classmethod def message_encryptor() -> Generator[MessageEncryptor, None, None]:
def setUpClass(cls) -> None: """
"""Create GPG encryptor once for all tests in this class.""" Session-scoped: GPG keypair generation is slow (~1s+), and nothing in
super().setUpClass() these tests mutates the keyring after creation.
cls.messageEncryptor = MessageEncryptor() """
encryptor = MessageEncryptor()
yield encryptor
encryptor.cleanup()
@classmethod
def tearDownClass(cls) -> None:
"""Clean up GPG resources after all tests complete."""
if hasattr(cls, "messageEncryptor"):
cls.messageEncryptor.cleanup()
super().tearDownClass()
def setUp(self) -> None: @pytest.fixture()
with override_settings( def gpg_settings(settings, message_encryptor: MessageEncryptor):
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home, settings.EMAIL_GNUPG_HOME = message_encryptor.gpg_home
EMAIL_ENABLE_GPG_DECRYPTOR=True, settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
): return settings
super().setUp()
def test_preprocessor_is_able_to_run(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
def test_preprocessor_is_able_to_run2(self) -> None: @pytest.fixture()
with override_settings( def encrypted_pair(mail_mocker, message_encryptor: MessageEncryptor):
EMAIL_GNUPG_HOME=None, """
EMAIL_ENABLE_GPG_DECRYPTOR=True, Build a (encrypted, plaintext) MailMessage pair sharing the same UID and
): headers, with two PDF attachments on the plaintext side.
self.assertTrue(MailMessageDecryptor.able_to_run()) """
plaintext = mail_mocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(filename="f1.pdf", disposition="inline"),
_AttachmentDef(filename="f2.pdf"),
],
)
encrypted = message_encryptor.encrypt(plaintext)
return encrypted, plaintext
def test_is_not_able_to_run_disabled(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=False,
):
self.assertFalse(MailMessageDecryptor.able_to_run())
def test_is_not_able_to_run_bogus_path(self) -> None: class TestMailMessageDecryptorAbleToRun:
with override_settings( """`MailMessageDecryptor.able_to_run()` configuration matrix — no DB needed."""
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME="_)@# notapath &%#$",
):
self.assertFalse(MailMessageDecryptor.able_to_run())
def test_fails_at_initialization(self) -> None: @pytest.mark.parametrize(
with ( ("settings_overrides", "expected"),
mock.patch("gnupg.GPG.__init__") as mock_run, [
override_settings( pytest.param(
EMAIL_ENABLE_GPG_DECRYPTOR=True, {
"EMAIL_GNUPG_HOME": "_gpg_home_marker",
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
},
True,
id="enabled-with-valid-home",
), ),
): pytest.param(
{"EMAIL_GNUPG_HOME": None, "EMAIL_ENABLE_GPG_DECRYPTOR": True},
True,
id="enabled-with-default-home",
),
pytest.param(
{"EMAIL_ENABLE_GPG_DECRYPTOR": False},
False,
id="disabled",
),
pytest.param(
{
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
"EMAIL_GNUPG_HOME": "_)@# notapath &%#$",
},
False,
id="enabled-with-bogus-path",
),
],
)
def test_able_to_run(
self,
settings,
message_encryptor: MessageEncryptor,
settings_overrides: dict,
*,
expected: bool,
) -> None:
for key, value in settings_overrides.items():
if value == "_gpg_home_marker":
value = message_encryptor.gpg_home
setattr(settings, key, value)
assert MailMessageDecryptor.able_to_run() is expected
def side_effect(*args, **kwargs):
raise OSError("Cannot find 'gpg' binary")
mock_run.side_effect = side_effect @pytest.mark.django_db
class TestMailMessageDecryptor:
"""End-to-end decrypt and consumption flow with a real GPG keyring."""
handler = MailAccountHandler() def test_fails_at_initialization(self, settings, mocker) -> None:
self.assertEqual(len(handler._message_preprocessors), 0) settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
mocker.patch(
"gnupg.GPG.__init__",
side_effect=OSError("Cannot find 'gpg' binary"),
)
def test_decrypt_fails(self) -> None: handler = MailAccountHandler()
encrypted_message, _ = self.create_encrypted_unencrypted_message_pair()
assert len(handler._message_preprocessors) == 0
def test_decrypt_fails(self, settings, encrypted_pair) -> None:
"""
A decryptor pointed at a fresh empty GPG home cannot decrypt the
message — ensure it surfaces an exception rather than silently passing
bytes through.
"""
encrypted_message, _ = encrypted_pair
# This test creates its own empty GPG home to test decryption failure # This test creates its own empty GPG home to test decryption failure
empty_gpg_home = tempfile.mkdtemp() empty_gpg_home = tempfile.mkdtemp()
try: try:
with override_settings( settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
EMAIL_ENABLE_GPG_DECRYPTOR=True, settings.EMAIL_GNUPG_HOME = empty_gpg_home
EMAIL_GNUPG_HOME=empty_gpg_home,
): decryptor = MailMessageDecryptor()
message_decryptor = MailMessageDecryptor() with pytest.raises(Exception):
self.assertRaises(Exception, message_decryptor.run, encrypted_message) decryptor.run(encrypted_message)
finally: finally:
# Clean up the temporary GPG home used only by this test # Clean up the temporary GPG home used only by this test
try: try:
@@ -199,57 +242,43 @@ class TestMailMessageGpgDecryptor(TestMail):
pass pass
shutil.rmtree(empty_gpg_home, ignore_errors=True) shutil.rmtree(empty_gpg_home, ignore_errors=True)
def test_decrypt_encrypted_mail(self) -> None: def test_decrypt_encrypted_mail(self, gpg_settings, encrypted_pair) -> None:
""" """
Creates a mail with attachments. Then encrypts it with a new key. Creates a mail with attachments. Then encrypts it with a new key.
Verifies that this encrypted message can be decrypted with attachments intact. Verifies that this encrypted message can be decrypted with attachments intact.
""" """
encrypted_message, message = self.create_encrypted_unencrypted_message_pair() encrypted_message, plaintext = encrypted_pair
headers = message.headers headers = plaintext.headers
text = message.text text = plaintext.text
self.assertEqual(len(encrypted_message.attachments), 1) assert len(encrypted_message.attachments) == 1
self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc") assert encrypted_message.attachments[0].filename == "encrypted.asc"
self.assertEqual(encrypted_message.text, "") assert encrypted_message.text == ""
with override_settings( decryptor = MailMessageDecryptor()
EMAIL_ENABLE_GPG_DECRYPTOR=True, assert decryptor.able_to_run()
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home, decrypted = decryptor.run(encrypted_message)
):
message_decryptor = MailMessageDecryptor()
self.assertTrue(message_decryptor.able_to_run())
decrypted_message = message_decryptor.run(encrypted_message)
self.assertEqual(len(decrypted_message.attachments), 2) assert len(decrypted.attachments) == 2
self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf") assert decrypted.attachments[0].filename == "f1.pdf"
self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf") assert decrypted.attachments[1].filename == "f2.pdf"
self.assertEqual(decrypted_message.headers, headers) assert decrypted.headers == headers
self.assertEqual(decrypted_message.text, text) assert decrypted.text == text
self.assertEqual(decrypted_message.uid, message.uid) assert decrypted.uid == plaintext.uid
def create_encrypted_unencrypted_message_pair(self): def test_handle_encrypted_message(
message = self.mailMocker.messageBuilder.create_message( self,
body="Test message with 2 attachments", gpg_settings,
attachments=[ mail_mocker,
_AttachmentDef( message_encryptor: MessageEncryptor,
filename="f1.pdf", ) -> None:
disposition="inline", plaintext = mail_mocker.messageBuilder.create_message(
),
_AttachmentDef(filename="f2.pdf"),
],
)
encrypted_message = self.messageEncryptor.encrypt(message)
return encrypted_message, message
def test_handle_encrypted_message(self) -> None:
message = self.mailMocker.messageBuilder.create_message(
subject="the message title", subject="the message title",
from_="Myself", from_="Myself",
attachments=2, attachments=2,
body="Test mail", body="Test mail",
) )
encrypted = message_encryptor.encrypt(plaintext)
encrypted_message = self.messageEncryptor.encrypt(message)
account = MailAccountFactory() account = MailAccountFactory()
rule = MailRule( rule = MailRule(
@@ -259,18 +288,17 @@ class TestMailMessageGpgDecryptor(TestMail):
) )
rule.save() rule.save()
result = self.mail_account_handler._handle_message(encrypted_message, rule) handler = MailAccountHandler()
result = handler._handle_message(encrypted, rule)
self.assertEqual(result, 3) assert result == 3
mail_mocker._queue_consumption_tasks_mock.assert_called()
self.mailMocker._queue_consumption_tasks_mock.assert_called() mail_mocker.assert_queue_consumption_tasks_call_args(
self.mailMocker.assert_queue_consumption_tasks_call_args(
[ [
[ [
{ {
"override_title": message.subject, "override_title": plaintext.subject,
"override_filename": f"{message.subject}.eml", "override_filename": f"{plaintext.subject}.eml",
}, },
], ],
[ [