Compare commits

...

8 Commits

4 changed files with 886 additions and 686 deletions
+108
View File
@@ -1,11 +1,19 @@
from collections.abc import Generator
from typing import TYPE_CHECKING
import pytest
import pytest_mock
from django.contrib.auth.models import User
from django.test import Client
from rest_framework.test import APIClient
from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailAccount
from paperless_mail.tests.factories import MailAccountFactory
if TYPE_CHECKING:
from paperless_mail.tests.test_mail import BogusMailBox
@pytest.fixture()
def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
@@ -27,3 +35,103 @@ def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
@pytest.fixture()
def mail_account_handler() -> MailAccountHandler:
return MailAccountHandler()
@pytest.fixture()
def mail_user(
db: None,
django_user_model,
client: Client,
):
"""
Create a user with the `add_mailaccount` permission and log them in via
the test client. Returned so tests can mutate permissions if needed.
"""
from django.contrib.auth.models import Permission
user = django_user_model.objects.create_user("testuser")
user.user_permissions.add(
*Permission.objects.filter(codename__in=["add_mailaccount"]),
)
user.save()
client.force_login(user)
return user
@pytest.fixture()
def oauth_settings(settings):
"""
Apply the OAuth callback / client-id settings the OAuth flow needs. Uses
pytest-django's `settings` fixture so values are reverted automatically.
"""
settings.OAUTH_CALLBACK_BASE_URL = "http://localhost:8000"
settings.GMAIL_OAUTH_CLIENT_ID = "test_gmail_client_id"
settings.GMAIL_OAUTH_CLIENT_SECRET = "test_gmail_client_secret"
settings.OUTLOOK_OAUTH_CLIENT_ID = "test_outlook_client_id"
settings.OUTLOOK_OAUTH_CLIENT_SECRET = "test_outlook_client_secret"
return settings
@pytest.fixture()
def mail_mocker(db: None):
"""
Provides a MailMocker instance with its `MailBox` and
`queue_consumption_tasks` patches active. Cleanups registered via
TestCase.addCleanup are run on teardown by calling doCleanups().
"""
from paperless_mail.tests.test_mail import MailMocker
mocker = MailMocker()
mocker.setUp()
try:
yield mocker
finally:
mocker.doCleanups()
@pytest.fixture()
def mail_api_user(
db: None,
django_user_model: type[User],
) -> User:
"""
Fully-permissioned (regular) user used by the mail API tests.
Has every model-level permission but is NOT a Django superuser/staff:
the owner-aware filtering and bulk_delete permission tests rely on
django-guardian's object-level checks, and `is_superuser` short-circuits
those checks. The name avoids `admin` to make this distinction explicit.
"""
from django.contrib.auth.models import Permission
user = django_user_model.objects.create_user(username="mail_api_user")
user.user_permissions.add(*Permission.objects.all())
user.save()
return user
@pytest.fixture()
def mail_api_client(mail_api_user: User) -> APIClient:
"""
DRF APIClient force-authenticated as `mail_api_user` and pinned to API v10
via the Accept header (matches `documents/tests/conftest.py:admin_client`).
"""
client = APIClient()
client.force_authenticate(user=mail_api_user)
client.credentials(HTTP_ACCEPT="application/json; version=10")
return client
@pytest.fixture()
def bogus_mailbox(mocker: pytest_mock.MockerFixture) -> "BogusMailBox":
"""
Patch `paperless_mail.mail.MailBox` with a `BogusMailBox` instance so the
`/api/mail_accounts/test/` endpoint can run without a real IMAP server.
Returns the bogus mailbox so tests can introspect/manipulate it.
"""
from paperless_mail.tests.test_mail import BogusMailBox
mailbox = BogusMailBox()
mock_mailbox_cls = mocker.patch("paperless_mail.mail.MailBox")
mock_mailbox_cls.return_value = mailbox
return mailbox
+305 -273
View File
@@ -1,44 +1,39 @@
import json
from unittest import mock
from typing import TYPE_CHECKING
from django.contrib.auth.models import Permission
import pytest
from django.contrib.auth.models import User
from guardian.shortcuts import assign_perm
from rest_framework import status
from rest_framework.test import APITestCase
from rest_framework.test import APIClient
from documents.tests.factories import CorrespondentFactory
from documents.tests.factories import DocumentTypeFactory
from documents.tests.factories import TagFactory
from documents.tests.utils import DirectoriesMixin
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.models import ProcessedMail
from paperless_mail.tests.factories import MailAccountFactory
from paperless_mail.tests.factories import MailRuleFactory
from paperless_mail.tests.factories import ProcessedMailFactory
from paperless_mail.tests.test_mail import BogusMailBox
if TYPE_CHECKING:
from paperless_mail.tests.test_mail import BogusMailBox
class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/mail_accounts/"
MAIL_ACCOUNTS_ENDPOINT = "/api/mail_accounts/"
MAIL_ACCOUNTS_TEST_ENDPOINT = f"{MAIL_ACCOUNTS_ENDPOINT}test/"
MAIL_RULES_ENDPOINT = "/api/mail_rules/"
PROCESSED_MAIL_ENDPOINT = "/api/processed_mail/"
PROCESSED_MAIL_BULK_DELETE_ENDPOINT = f"{PROCESSED_MAIL_ENDPOINT}bulk_delete/"
def setUp(self) -> None:
self.bogus_mailbox = BogusMailBox()
patcher = mock.patch("paperless_mail.mail.MailBox")
m = patcher.start()
m.return_value = self.bogus_mailbox
self.addCleanup(patcher.stop)
super().setUp()
self.user = User.objects.create_user(username="temp_admin")
self.user.user_permissions.add(*Permission.objects.all())
self.user.save()
self.client.force_authenticate(user=self.user)
def test_get_mail_accounts(self) -> None:
@pytest.mark.django_db
class TestAPIMailAccounts:
def test_get_mail_accounts(
self,
mail_api_client: APIClient,
) -> None:
"""
GIVEN:
- Configured mail accounts
@@ -47,7 +42,6 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
THEN:
- Configured mail accounts are provided
"""
account1 = MailAccountFactory(
name="Email1",
username="username1",
@@ -56,31 +50,30 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
imap_port=443,
)
response = self.client.get(self.ENDPOINT)
response = mail_api_client.get(MAIL_ACCOUNTS_ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 1)
assert response.status_code == status.HTTP_200_OK
assert response.data["count"] == 1
returned_account1 = response.data["results"][0]
self.assertEqual(returned_account1["name"], account1.name)
self.assertEqual(returned_account1["username"], account1.username)
self.assertEqual(
returned_account1["password"],
"**********",
)
self.assertEqual(returned_account1["imap_server"], account1.imap_server)
self.assertEqual(returned_account1["imap_port"], account1.imap_port)
self.assertEqual(returned_account1["imap_security"], account1.imap_security)
self.assertEqual(returned_account1["character_set"], account1.character_set)
assert returned_account1["name"] == account1.name
assert returned_account1["username"] == account1.username
assert returned_account1["password"] == "**********"
assert returned_account1["imap_server"] == account1.imap_server
assert returned_account1["imap_port"] == account1.imap_port
assert returned_account1["imap_security"] == account1.imap_security
assert returned_account1["character_set"] == account1.character_set
def test_create_mail_account(self) -> None:
def test_create_mail_account(
self,
mail_api_client: APIClient,
) -> None:
"""
WHEN:
- API request is made to add a mail account
THEN:
- A new mail account is created
"""
account1 = {
"name": "Email1",
"username": "username1",
@@ -91,24 +84,27 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
"character_set": "UTF-8",
}
response = self.client.post(
self.ENDPOINT,
response = mail_api_client.post(
MAIL_ACCOUNTS_ENDPOINT,
data=account1,
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
assert response.status_code == status.HTTP_201_CREATED
returned_account1 = MailAccount.objects.get(name="Email1")
self.assertEqual(returned_account1.name, account1["name"])
self.assertEqual(returned_account1.username, account1["username"])
self.assertEqual(returned_account1.password, account1["password"])
self.assertEqual(returned_account1.imap_server, account1["imap_server"])
self.assertEqual(returned_account1.imap_port, account1["imap_port"])
self.assertEqual(returned_account1.imap_security, account1["imap_security"])
self.assertEqual(returned_account1.character_set, account1["character_set"])
assert returned_account1.name == account1["name"]
assert returned_account1.username == account1["username"]
assert returned_account1.password == account1["password"]
assert returned_account1.imap_server == account1["imap_server"]
assert returned_account1.imap_port == account1["imap_port"]
assert returned_account1.imap_security == account1["imap_security"]
assert returned_account1.character_set == account1["character_set"]
def test_delete_mail_account(self) -> None:
def test_delete_mail_account(
self,
mail_api_client: APIClient,
) -> None:
"""
GIVEN:
- Existing mail account
@@ -117,18 +113,20 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
THEN:
- Account is deleted
"""
account1 = MailAccountFactory()
response = self.client.delete(
f"{self.ENDPOINT}{account1.pk}/",
response = mail_api_client.delete(
f"{MAIL_ACCOUNTS_ENDPOINT}{account1.pk}/",
)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
assert response.status_code == status.HTTP_204_NO_CONTENT
self.assertEqual(len(MailAccount.objects.all()), 0)
assert MailAccount.objects.count() == 0
def test_update_mail_account(self) -> None:
def test_update_mail_account(
self,
mail_api_client: APIClient,
) -> None:
"""
GIVEN:
- Existing mail accounts
@@ -137,49 +135,51 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
THEN:
- The mail account is updated, password only updated if not '****'
"""
account1 = MailAccountFactory()
response = self.client.patch(
f"{self.ENDPOINT}{account1.pk}/",
response = mail_api_client.patch(
f"{MAIL_ACCOUNTS_ENDPOINT}{account1.pk}/",
data={
"name": "Updated Name 1",
"password": "******",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
assert response.status_code == status.HTTP_200_OK
returned_account1 = MailAccount.objects.get(pk=account1.pk)
self.assertEqual(returned_account1.name, "Updated Name 1")
self.assertEqual(returned_account1.password, account1.password)
assert returned_account1.name == "Updated Name 1"
assert returned_account1.password == account1.password
response = self.client.patch(
f"{self.ENDPOINT}{account1.pk}/",
response = mail_api_client.patch(
f"{MAIL_ACCOUNTS_ENDPOINT}{account1.pk}/",
data={
"name": "Updated Name 2",
"password": "123xyz",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
assert response.status_code == status.HTTP_200_OK
returned_account2 = MailAccount.objects.get(pk=account1.pk)
self.assertEqual(returned_account2.name, "Updated Name 2")
self.assertEqual(returned_account2.password, "123xyz")
assert returned_account2.name == "Updated Name 2"
assert returned_account2.password == "123xyz"
def test_mail_account_test_fail(self) -> None:
def test_mail_account_test_fail(
self,
mail_api_client: APIClient,
bogus_mailbox: "BogusMailBox",
) -> None:
"""
GIVEN:
- Errnoeous mail account details
- Erroneous mail account details
WHEN:
- API call is made to test account
THEN:
- API returns 400 bad request
"""
response = self.client.post(
f"{self.ENDPOINT}test/",
response = mail_api_client.post(
MAIL_ACCOUNTS_TEST_ENDPOINT,
json.dumps(
{
"imap_server": "server.example.com",
@@ -192,9 +192,13 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_mail_account_test_success(self) -> None:
def test_mail_account_test_success(
self,
mail_api_client: APIClient,
bogus_mailbox: "BogusMailBox",
) -> None:
"""
GIVEN:
- Working mail account details
@@ -203,9 +207,8 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
THEN:
- API returns success
"""
response = self.client.post(
f"{self.ENDPOINT}test/",
response = mail_api_client.post(
MAIL_ACCOUNTS_TEST_ENDPOINT,
json.dumps(
{
"imap_server": "server.example.com",
@@ -217,10 +220,14 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["success"], True)
assert response.status_code == status.HTTP_200_OK
assert response.data["success"] is True
def test_mail_account_test_existing(self) -> None:
def test_mail_account_test_existing(
self,
mail_api_client: APIClient,
bogus_mailbox: "BogusMailBox",
) -> None:
"""
GIVEN:
- Testing server details for an existing account with obfuscated password (***)
@@ -236,8 +243,8 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
imap_port=443,
)
response = self.client.post(
f"{self.ENDPOINT}test/",
response = mail_api_client.post(
MAIL_ACCOUNTS_TEST_ENDPOINT,
json.dumps(
{
"id": account.pk,
@@ -250,12 +257,16 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["success"], True)
assert response.status_code == status.HTTP_200_OK
assert response.data["success"] is True
def test_mail_account_test_existing_nonexistent_id_forbidden(self) -> None:
response = self.client.post(
f"{self.ENDPOINT}test/",
def test_mail_account_test_existing_nonexistent_id_forbidden(
self,
mail_api_client: APIClient,
bogus_mailbox: "BogusMailBox",
) -> None:
response = mail_api_client.post(
MAIL_ACCOUNTS_TEST_ENDPOINT,
json.dumps(
{
"id": 999999,
@@ -268,10 +279,15 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(response.content.decode(), "Insufficient permissions")
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.content.decode() == "Insufficient permissions"
def test_get_mail_accounts_owner_aware(self) -> None:
def test_get_mail_accounts_owner_aware(
self,
mail_api_client: APIClient,
mail_api_user: User,
django_user_model: type[User],
) -> None:
"""
GIVEN:
- Configured accounts with different users
@@ -280,36 +296,29 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
THEN:
- Only unowned, owned by user or granted accounts are provided
"""
user2 = User.objects.create_user(username="temp_admin2")
user2 = django_user_model.objects.create_user(username="temp_admin2")
account1 = MailAccountFactory(name="Email1")
account2 = MailAccountFactory(name="Email2", owner=self.user)
account2 = MailAccountFactory(name="Email2", owner=mail_api_user)
_account3 = MailAccountFactory(name="Email3", owner=user2)
account4 = MailAccountFactory(name="Email4", owner=user2)
assign_perm("view_mailaccount", self.user, account4)
assign_perm("view_mailaccount", mail_api_user, account4)
response = self.client.get(self.ENDPOINT)
response = mail_api_client.get(MAIL_ACCOUNTS_ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 3)
self.assertEqual(response.data["results"][0]["name"], account1.name)
self.assertEqual(response.data["results"][1]["name"], account2.name)
self.assertEqual(response.data["results"][2]["name"], account4.name)
assert response.status_code == status.HTTP_200_OK
assert response.data["count"] == 3
assert response.data["results"][0]["name"] == account1.name
assert response.data["results"][1]["name"] == account2.name
assert response.data["results"][2]["name"] == account4.name
class TestAPIMailRules(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/mail_rules/"
def setUp(self) -> None:
super().setUp()
self.user = User.objects.create_user(username="temp_admin")
self.user.user_permissions.add(*Permission.objects.all())
self.user.save()
self.client.force_authenticate(user=self.user)
def test_get_mail_rules(self) -> None:
@pytest.mark.django_db
class TestAPIMailRules:
def test_get_mail_rules(
self,
mail_api_client: APIClient,
) -> None:
"""
GIVEN:
- Configured mail accounts and rules
@@ -318,7 +327,6 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
THEN:
- Configured mail rules are provided
"""
account1 = MailAccountFactory()
rule1 = MailRuleFactory(
name="Rule1",
@@ -330,34 +338,37 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
filter_attachment_filename_include="file.pdf",
)
response = self.client.get(self.ENDPOINT)
response = mail_api_client.get(MAIL_RULES_ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 1)
assert response.status_code == status.HTTP_200_OK
assert response.data["count"] == 1
returned_rule1 = response.data["results"][0]
self.assertEqual(returned_rule1["name"], rule1.name)
self.assertEqual(returned_rule1["account"], account1.pk)
self.assertEqual(returned_rule1["folder"], rule1.folder)
self.assertEqual(returned_rule1["filter_from"], rule1.filter_from)
self.assertEqual(returned_rule1["filter_to"], rule1.filter_to)
self.assertEqual(returned_rule1["filter_subject"], rule1.filter_subject)
self.assertEqual(returned_rule1["filter_body"], rule1.filter_body)
self.assertEqual(
returned_rule1["filter_attachment_filename_include"],
rule1.filter_attachment_filename_include,
assert returned_rule1["name"] == rule1.name
assert returned_rule1["account"] == account1.pk
assert returned_rule1["folder"] == rule1.folder
assert returned_rule1["filter_from"] == rule1.filter_from
assert returned_rule1["filter_to"] == rule1.filter_to
assert returned_rule1["filter_subject"] == rule1.filter_subject
assert returned_rule1["filter_body"] == rule1.filter_body
assert (
returned_rule1["filter_attachment_filename_include"]
== rule1.filter_attachment_filename_include
)
self.assertEqual(returned_rule1["maximum_age"], rule1.maximum_age)
self.assertEqual(returned_rule1["action"], rule1.action)
self.assertEqual(returned_rule1["assign_title_from"], rule1.assign_title_from)
self.assertEqual(
returned_rule1["assign_correspondent_from"],
rule1.assign_correspondent_from,
assert returned_rule1["maximum_age"] == rule1.maximum_age
assert returned_rule1["action"] == rule1.action
assert returned_rule1["assign_title_from"] == rule1.assign_title_from
assert (
returned_rule1["assign_correspondent_from"]
== rule1.assign_correspondent_from
)
self.assertEqual(returned_rule1["order"], rule1.order)
self.assertEqual(returned_rule1["attachment_type"], rule1.attachment_type)
assert returned_rule1["order"] == rule1.order
assert returned_rule1["attachment_type"] == rule1.attachment_type
def test_create_mail_rule(self) -> None:
def test_create_mail_rule(
self,
mail_api_client: APIClient,
) -> None:
"""
GIVEN:
- Configured mail account exists
@@ -366,7 +377,6 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
THEN:
- A new mail rule is created
"""
account1 = MailAccountFactory()
tag = TagFactory(name="t")
correspondent = CorrespondentFactory(name="c")
@@ -394,58 +404,51 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
"assign_owner_from_rule": True,
}
response = self.client.post(
self.ENDPOINT,
response = mail_api_client.post(
MAIL_RULES_ENDPOINT,
data=rule1,
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
assert response.status_code == status.HTTP_201_CREATED
response = self.client.get(self.ENDPOINT)
response = mail_api_client.get(MAIL_RULES_ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 1)
assert response.status_code == status.HTTP_200_OK
assert response.data["count"] == 1
returned_rule1 = response.data["results"][0]
self.assertEqual(returned_rule1["name"], rule1["name"])
self.assertEqual(returned_rule1["account"], account1.pk)
self.assertEqual(returned_rule1["folder"], rule1["folder"])
self.assertEqual(returned_rule1["filter_from"], rule1["filter_from"])
self.assertEqual(returned_rule1["filter_to"], rule1["filter_to"])
self.assertEqual(returned_rule1["filter_subject"], rule1["filter_subject"])
self.assertEqual(returned_rule1["filter_body"], rule1["filter_body"])
self.assertEqual(
returned_rule1["filter_attachment_filename_include"],
rule1["filter_attachment_filename_include"],
assert returned_rule1["name"] == rule1["name"]
assert returned_rule1["account"] == account1.pk
assert returned_rule1["folder"] == rule1["folder"]
assert returned_rule1["filter_from"] == rule1["filter_from"]
assert returned_rule1["filter_to"] == rule1["filter_to"]
assert returned_rule1["filter_subject"] == rule1["filter_subject"]
assert returned_rule1["filter_body"] == rule1["filter_body"]
assert (
returned_rule1["filter_attachment_filename_include"]
== rule1["filter_attachment_filename_include"]
)
self.assertEqual(returned_rule1["maximum_age"], rule1["maximum_age"])
self.assertEqual(returned_rule1["action"], rule1["action"])
self.assertEqual(
returned_rule1["assign_title_from"],
rule1["assign_title_from"],
assert returned_rule1["maximum_age"] == rule1["maximum_age"]
assert returned_rule1["action"] == rule1["action"]
assert returned_rule1["assign_title_from"] == rule1["assign_title_from"]
assert (
returned_rule1["assign_correspondent_from"]
== rule1["assign_correspondent_from"]
)
self.assertEqual(
returned_rule1["assign_correspondent_from"],
rule1["assign_correspondent_from"],
)
self.assertEqual(returned_rule1["order"], rule1["order"])
self.assertEqual(returned_rule1["attachment_type"], rule1["attachment_type"])
self.assertEqual(returned_rule1["action_parameter"], rule1["action_parameter"])
self.assertEqual(
returned_rule1["assign_correspondent"],
rule1["assign_correspondent"],
)
self.assertEqual(
returned_rule1["assign_document_type"],
rule1["assign_document_type"],
)
self.assertEqual(returned_rule1["assign_tags"], rule1["assign_tags"])
self.assertEqual(
returned_rule1["assign_owner_from_rule"],
rule1["assign_owner_from_rule"],
assert returned_rule1["order"] == rule1["order"]
assert returned_rule1["attachment_type"] == rule1["attachment_type"]
assert returned_rule1["action_parameter"] == rule1["action_parameter"]
assert returned_rule1["assign_correspondent"] == rule1["assign_correspondent"]
assert returned_rule1["assign_document_type"] == rule1["assign_document_type"]
assert returned_rule1["assign_tags"] == rule1["assign_tags"]
assert (
returned_rule1["assign_owner_from_rule"] == rule1["assign_owner_from_rule"]
)
def test_delete_mail_rule(self) -> None:
def test_delete_mail_rule(
self,
mail_api_client: APIClient,
) -> None:
"""
GIVEN:
- Existing mail rule
@@ -454,19 +457,21 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
THEN:
- Rule is deleted
"""
account1 = MailAccountFactory()
rule1 = MailRuleFactory(account=account1)
response = self.client.delete(
f"{self.ENDPOINT}{rule1.pk}/",
response = mail_api_client.delete(
f"{MAIL_RULES_ENDPOINT}{rule1.pk}/",
)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
assert response.status_code == status.HTTP_204_NO_CONTENT
self.assertEqual(len(MailRule.objects.all()), 0)
assert MailRule.objects.count() == 0
def test_update_mail_rule(self) -> None:
def test_update_mail_rule(
self,
mail_api_client: APIClient,
) -> None:
"""
GIVEN:
- Existing mail rule
@@ -475,30 +480,33 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
THEN:
- The mail rule is updated
"""
account1 = MailAccountFactory()
rule1 = MailRuleFactory(account=account1)
response = self.client.patch(
f"{self.ENDPOINT}{rule1.pk}/",
response = mail_api_client.patch(
f"{MAIL_RULES_ENDPOINT}{rule1.pk}/",
data={
"name": "Updated Name 1",
"action": MailRule.MailAction.DELETE,
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
assert response.status_code == status.HTTP_200_OK
returned_rule1 = MailRule.objects.get(pk=rule1.pk)
self.assertEqual(returned_rule1.name, "Updated Name 1")
self.assertEqual(returned_rule1.action, MailRule.MailAction.DELETE)
assert returned_rule1.name == "Updated Name 1"
assert returned_rule1.action == MailRule.MailAction.DELETE
def test_create_mail_rule_scopes_accounts(self) -> None:
other_user = User.objects.create_user(username="mail-owner")
def test_create_mail_rule_scopes_accounts(
self,
mail_api_client: APIClient,
django_user_model: type[User],
) -> None:
other_user = django_user_model.objects.create_user(username="mail-owner")
foreign_account = MailAccountFactory(name="ForeignEmail", owner=other_user)
response = self.client.post(
self.ENDPOINT,
response = mail_api_client.post(
MAIL_RULES_ENDPOINT,
data={
"name": "Rule1",
"account": foreign_account.pk,
@@ -512,8 +520,8 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
"attachment_type": MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
},
)
missing_response = self.client.post(
self.ENDPOINT,
missing_response = mail_api_client.post(
MAIL_RULES_ENDPOINT,
data={
"name": "Rule1",
"account": foreign_account.pk + 1000,
@@ -528,21 +536,24 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(missing_response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.data["account"][0].code, "does_not_exist")
self.assertEqual(missing_response.data["account"][0].code, "does_not_exist")
self.assertEqual(MailRule.objects.count(), 0)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert missing_response.status_code == status.HTTP_400_BAD_REQUEST
assert response.data["account"][0].code == "does_not_exist"
assert missing_response.data["account"][0].code == "does_not_exist"
assert MailRule.objects.count() == 0
def test_create_mail_rule_allowed_for_granted_account_change_permission(
self,
mail_api_client: APIClient,
mail_api_user: User,
django_user_model: type[User],
) -> None:
other_user = User.objects.create_user(username="mail-owner")
other_user = django_user_model.objects.create_user(username="mail-owner")
foreign_account = MailAccountFactory(name="ForeignEmail", owner=other_user)
assign_perm("change_mailaccount", self.user, foreign_account)
assign_perm("change_mailaccount", mail_api_user, foreign_account)
response = self.client.post(
self.ENDPOINT,
response = mail_api_client.post(
MAIL_RULES_ENDPOINT,
data={
"name": "Rule1",
"account": foreign_account.pk,
@@ -557,25 +568,34 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
},
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(MailRule.objects.get().account, foreign_account)
assert response.status_code == status.HTTP_201_CREATED
assert MailRule.objects.get().account == foreign_account
def test_update_mail_rule_forbidden_for_unpermitted_account(self) -> None:
def test_update_mail_rule_forbidden_for_unpermitted_account(
self,
mail_api_client: APIClient,
django_user_model: type[User],
) -> None:
own_account = MailAccountFactory()
other_user = User.objects.create_user(username="mail-owner")
other_user = django_user_model.objects.create_user(username="mail-owner")
foreign_account = MailAccountFactory(owner=other_user)
rule1 = MailRuleFactory(account=own_account)
response = self.client.patch(
f"{self.ENDPOINT}{rule1.pk}/",
response = mail_api_client.patch(
f"{MAIL_RULES_ENDPOINT}{rule1.pk}/",
data={"account": foreign_account.pk},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
assert response.status_code == status.HTTP_400_BAD_REQUEST
rule1.refresh_from_db()
self.assertEqual(rule1.account, own_account)
assert rule1.account == own_account
def test_get_mail_rules_owner_aware(self) -> None:
def test_get_mail_rules_owner_aware(
self,
mail_api_client: APIClient,
mail_api_user: User,
django_user_model: type[User],
) -> None:
"""
GIVEN:
- Configured rules with different users
@@ -584,24 +604,26 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
THEN:
- Only unowned, owned by user or granted mail rules are provided
"""
user2 = User.objects.create_user(username="temp_admin2")
user2 = django_user_model.objects.create_user(username="temp_admin2")
account1 = MailAccountFactory()
rule1 = MailRuleFactory(account=account1, order=0)
rule2 = MailRuleFactory(account=account1, order=1, owner=self.user)
rule2 = MailRuleFactory(account=account1, order=1, owner=mail_api_user)
MailRuleFactory(account=account1, order=2, owner=user2)
rule4 = MailRuleFactory(account=account1, order=3, owner=user2)
assign_perm("view_mailrule", self.user, rule4)
assign_perm("view_mailrule", mail_api_user, rule4)
response = self.client.get(self.ENDPOINT)
response = mail_api_client.get(MAIL_RULES_ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 3)
self.assertEqual(response.data["results"][0]["name"], rule1.name)
self.assertEqual(response.data["results"][1]["name"], rule2.name)
self.assertEqual(response.data["results"][2]["name"], rule4.name)
assert response.status_code == status.HTTP_200_OK
assert response.data["count"] == 3
assert response.data["results"][0]["name"] == rule1.name
assert response.data["results"][1]["name"] == rule2.name
assert response.data["results"][2]["name"] == rule4.name
def test_mailrule_maxage_validation(self) -> None:
def test_mailrule_maxage_validation(
self,
mail_api_client: APIClient,
) -> None:
"""
GIVEN:
- An existing mail account
@@ -629,24 +651,24 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
"attachment_type": MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
}
response = self.client.post(self.ENDPOINT, data=rule_data, format="json")
response = mail_api_client.post(
MAIL_RULES_ENDPOINT,
data=rule_data,
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("maximum_age", response.data)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "maximum_age" in response.data
class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/processed_mail/"
def setUp(self) -> None:
super().setUp()
self.user = User.objects.create_user(username="temp_admin")
self.user.user_permissions.add(*Permission.objects.all())
self.user.save()
self.client.force_authenticate(user=self.user)
def test_get_processed_mails_owner_aware(self) -> None:
@pytest.mark.django_db
class TestAPIProcessedMails:
def test_get_processed_mails_owner_aware(
self,
mail_api_client: APIClient,
mail_api_user: User,
django_user_model: type[User],
) -> None:
"""
GIVEN:
- Configured processed mails with different users
@@ -655,27 +677,31 @@ class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
THEN:
- Only unowned, owned by user or granted processed mails are provided
"""
user2 = User.objects.create_user(username="temp_admin2")
user2 = django_user_model.objects.create_user(username="temp_admin2")
rule = MailRuleFactory()
pm1 = ProcessedMailFactory(rule=rule)
pm2 = ProcessedMailFactory(
rule=rule,
status="FAILED",
error="err",
owner=self.user,
owner=mail_api_user,
)
ProcessedMailFactory(rule=rule, owner=user2)
pm4 = ProcessedMailFactory(rule=rule, owner=user2)
assign_perm("view_processedmail", self.user, pm4)
assign_perm("view_processedmail", mail_api_user, pm4)
response = self.client.get(self.ENDPOINT)
response = mail_api_client.get(PROCESSED_MAIL_ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 3)
assert response.status_code == status.HTTP_200_OK
assert response.data["count"] == 3
returned_ids = {r["id"] for r in response.data["results"]}
self.assertSetEqual(returned_ids, {pm1.id, pm2.id, pm4.id})
assert returned_ids == {pm1.id, pm2.id, pm4.id}
def test_get_processed_mails_filter_by_rule(self) -> None:
def test_get_processed_mails_filter_by_rule(
self,
mail_api_client: APIClient,
mail_api_user: User,
) -> None:
"""
GIVEN:
- Processed mails belonging to two different rules
@@ -687,17 +713,22 @@ class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
account = MailAccountFactory()
rule1 = MailRuleFactory(account=account)
rule2 = MailRuleFactory(account=account)
pm1 = ProcessedMailFactory(rule=rule1, owner=self.user)
pm1 = ProcessedMailFactory(rule=rule1, owner=mail_api_user)
pm2 = ProcessedMailFactory(rule=rule1, status="FAILED", error="e")
ProcessedMailFactory(rule=rule2)
response = self.client.get(f"{self.ENDPOINT}?rule={rule1.pk}")
response = mail_api_client.get(f"{PROCESSED_MAIL_ENDPOINT}?rule={rule1.pk}")
self.assertEqual(response.status_code, status.HTTP_200_OK)
assert response.status_code == status.HTTP_200_OK
returned_ids = {r["id"] for r in response.data["results"]}
self.assertSetEqual(returned_ids, {pm1.id, pm2.id})
assert returned_ids == {pm1.id, pm2.id}
def test_bulk_delete_processed_mails(self) -> None:
def test_bulk_delete_processed_mails(
self,
mail_api_client: APIClient,
mail_api_user: User,
django_user_model: type[User],
) -> None:
"""
GIVEN:
- Processed mails belonging to two different rules and different users
@@ -706,7 +737,7 @@ class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
THEN:
- Only the specified processed mails are deleted, respecting ownership and permissions
"""
user2 = User.objects.create_user(username="temp_admin2")
user2 = django_user_model.objects.create_user(username="temp_admin2")
rule = MailRuleFactory()
# unowned, owned by self, and one with explicit object perm
pm_unowned = ProcessedMailFactory(rule=rule)
@@ -714,46 +745,47 @@ class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
rule=rule,
status="FAILED",
error="e",
owner=self.user,
owner=mail_api_user,
)
pm_granted = ProcessedMailFactory(rule=rule, owner=user2)
assign_perm("delete_processedmail", self.user, pm_granted)
assign_perm("delete_processedmail", mail_api_user, pm_granted)
pm_forbidden = ProcessedMailFactory(rule=rule, owner=user2)
# Success for allowed items
response = self.client.post(
f"{self.ENDPOINT}bulk_delete/",
response = mail_api_client.post(
PROCESSED_MAIL_BULK_DELETE_ENDPOINT,
data={
"mail_ids": [pm_unowned.id, pm_owned.id, pm_granted.id],
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["result"], "OK")
self.assertSetEqual(
set(response.data["deleted_mail_ids"]),
{pm_unowned.id, pm_owned.id, pm_granted.id},
)
self.assertFalse(ProcessedMail.objects.filter(id=pm_unowned.id).exists())
self.assertFalse(ProcessedMail.objects.filter(id=pm_owned.id).exists())
self.assertFalse(ProcessedMail.objects.filter(id=pm_granted.id).exists())
self.assertTrue(ProcessedMail.objects.filter(id=pm_forbidden.id).exists())
assert response.status_code == status.HTTP_200_OK
assert response.data["result"] == "OK"
assert set(response.data["deleted_mail_ids"]) == {
pm_unowned.id,
pm_owned.id,
pm_granted.id,
}
assert not ProcessedMail.objects.filter(id=pm_unowned.id).exists()
assert not ProcessedMail.objects.filter(id=pm_owned.id).exists()
assert not ProcessedMail.objects.filter(id=pm_granted.id).exists()
assert ProcessedMail.objects.filter(id=pm_forbidden.id).exists()
# 403 and not deleted
response = self.client.post(
f"{self.ENDPOINT}bulk_delete/",
response = mail_api_client.post(
PROCESSED_MAIL_BULK_DELETE_ENDPOINT,
data={
"mail_ids": [pm_forbidden.id],
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertTrue(ProcessedMail.objects.filter(id=pm_forbidden.id).exists())
assert response.status_code == status.HTTP_403_FORBIDDEN
assert ProcessedMail.objects.filter(id=pm_forbidden.id).exists()
# missing mail_ids
response = self.client.post(
f"{self.ENDPOINT}bulk_delete/",
response = mail_api_client.post(
PROCESSED_MAIL_BULK_DELETE_ENDPOINT,
data={"mail_ids": "not-a-list"},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
assert response.status_code == status.HTTP_400_BAD_REQUEST
+319 -287
View File
@@ -1,13 +1,14 @@
from datetime import timedelta
from unittest import mock
import pytest
import pytest_mock
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.test import TestCase
from django.test import override_settings
from django.test import Client
from django.utils import timezone
from httpx_oauth.oauth2 import GetAccessTokenError
from httpx_oauth.oauth2 import RefreshTokenError
from pytest_django.fixtures import SettingsWrapper
from rest_framework import status
from paperless_mail.mail import MailAccountHandler
@@ -16,340 +17,374 @@ from paperless_mail.oauth import PaperlessMailOAuth2Manager
from paperless_mail.tests.factories import MailAccountFactory
@override_settings(
OAUTH_CALLBACK_BASE_URL="http://localhost:8000",
GMAIL_OAUTH_CLIENT_ID="test_gmail_client_id",
GMAIL_OAUTH_CLIENT_SECRET="test_gmail_client_secret",
OUTLOOK_OAUTH_CLIENT_ID="test_outlook_client_id",
OUTLOOK_OAUTH_CLIENT_SECRET="test_outlook_client_secret",
)
class TestMailOAuth(
TestCase,
):
def setUp(self) -> None:
self.user = User.objects.create_user("testuser")
self.user.user_permissions.add(
*Permission.objects.filter(
codename__in=[
"add_mailaccount",
],
@pytest.fixture()
def oauth_manager() -> PaperlessMailOAuth2Manager:
return PaperlessMailOAuth2Manager()
@pytest.fixture()
def oauth_session(client: Client) -> Client:
"""Seed the test client session with a known oauth_state."""
session = client.session
session.update({"oauth_state": "test_state"})
session.save()
return client
class TestOAuthUrlGeneration:
"""OAuth callback / redirect URL construction by PaperlessMailOAuth2Manager."""
@pytest.mark.parametrize(
("overrides", "expected"),
[
pytest.param(
{"OAUTH_CALLBACK_BASE_URL": "http://paperless.example.com"},
"http://paperless.example.com/api/oauth/callback/",
id="callback-base-url-set",
),
)
self.user.save()
self.client.force_login(self.user)
self.mail_account_handler = MailAccountHandler()
super().setUp()
def test_generate_paths(self) -> None:
"""
GIVEN:
- Mocked settings for OAuth callback and base URLs
WHEN:
- get_oauth_callback_url and get_oauth_redirect_url are called
THEN:
- Correct URLs are generated
"""
# Callback URL
oauth_manager = PaperlessMailOAuth2Manager()
with override_settings(OAUTH_CALLBACK_BASE_URL="http://paperless.example.com"):
self.assertEqual(
oauth_manager.oauth_callback_url,
pytest.param(
{
"OAUTH_CALLBACK_BASE_URL": None,
"PAPERLESS_URL": "http://paperless.example.com",
},
"http://paperless.example.com/api/oauth/callback/",
)
with override_settings(
OAUTH_CALLBACK_BASE_URL=None,
PAPERLESS_URL="http://paperless.example.com",
):
self.assertEqual(
oauth_manager.oauth_callback_url,
"http://paperless.example.com/api/oauth/callback/",
)
with override_settings(
OAUTH_CALLBACK_BASE_URL=None,
PAPERLESS_URL="http://paperless.example.com",
BASE_URL="/paperless/",
):
self.assertEqual(
oauth_manager.oauth_callback_url,
id="falls-back-to-paperless-url",
),
pytest.param(
{
"OAUTH_CALLBACK_BASE_URL": None,
"PAPERLESS_URL": "http://paperless.example.com",
"BASE_URL": "/paperless/",
},
"http://paperless.example.com/paperless/api/oauth/callback/",
)
# Redirect URL
with override_settings(DEBUG=True):
self.assertEqual(
oauth_manager.oauth_redirect_url,
"http://localhost:4200/mail",
)
with override_settings(DEBUG=False):
self.assertEqual(
oauth_manager.oauth_redirect_url,
"/mail",
)
@mock.patch(
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_gmail_access_token",
id="respects-base-url-prefix",
),
],
)
@mock.patch(
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_outlook_access_token",
)
def test_oauth_callback_view_success(
def test_oauth_callback_url(
self,
mock_get_outlook_access_token,
mock_get_gmail_access_token,
settings: SettingsWrapper,
oauth_manager: PaperlessMailOAuth2Manager,
overrides: dict,
expected: str,
) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- Various combinations of OAUTH_CALLBACK_BASE_URL, PAPERLESS_URL, and BASE_URL
WHEN:
- OAuth callback is called with a code and scope
- OAuth callback is called with a code and no scope
- oauth_callback_url is read from the manager
THEN:
- Gmail mail account is created
- Outlook mail account is created
- The expected fully-qualified callback URL is produced
"""
for key, value in overrides.items():
setattr(settings, key, value)
assert oauth_manager.oauth_callback_url == expected
mock_get_gmail_access_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
mock_get_outlook_access_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
session = self.client.session
session.update(
{
"oauth_state": "test_state",
},
)
session.save()
# Test Google OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=1", response.url)
mock_get_gmail_access_token.assert_called_once()
self.assertTrue(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
# Test Outlook OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=1", response.url)
self.assertTrue(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
@mock.patch("httpx_oauth.oauth2.BaseOAuth2.get_access_token")
def test_oauth_callback_view_fails(self, mock_get_access_token) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
WHEN:
- OAuth callback is called and get access token returns an error
THEN:
- No mail account is created
- Error is logged
"""
mock_get_access_token.side_effect = GetAccessTokenError("test_error")
session = self.client.session
session.update(
{
"oauth_state": "test_state",
},
)
session.save()
with self.assertLogs("paperless_mail", level="ERROR") as cm:
# Test Google OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=0", response.url)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
# Test Outlook OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=0", response.url)
self.assertFalse(
MailAccount.objects.filter(
imap_server="outlook.office365.com",
).exists(),
)
self.assertIn(
"Error getting access token from OAuth provider",
cm.output[0],
)
def test_oauth_callback_view_insufficient_permissions(self) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- User without add_mailaccount permission
WHEN:
- OAuth callback is called
THEN:
- 400 bad request returned, no mail accounts are created
"""
self.user.user_permissions.remove(
*Permission.objects.filter(
codename__in=[
"add_mailaccount",
],
@pytest.mark.parametrize(
("debug", "expected"),
[
pytest.param(
True,
"http://localhost:4200/mail",
id="debug-redirects-to-ng-dev",
),
)
self.user.save()
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
def test_oauth_callback_view_no_code(self) -> None:
pytest.param(False, "/mail", id="prod-redirects-to-relative-path"),
],
)
def test_oauth_redirect_url(
self,
settings: SettingsWrapper,
oauth_manager: PaperlessMailOAuth2Manager,
debug: bool, # noqa: FBT001
expected: str,
) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- DEBUG is toggled on or off
WHEN:
- OAuth callback is called without a code
- oauth_redirect_url is read from the manager
THEN:
- 400 bad request returned, no mail accounts are created
- In DEBUG mode the Angular dev server URL is returned, otherwise a relative path
"""
settings.DEBUG = debug
assert oauth_manager.oauth_redirect_url == expected
response = self.client.get(
"/api/oauth/callback/",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
def test_oauth_callback_view_invalid_state(self) -> None:
@pytest.mark.django_db
class TestOAuthCallbackView:
"""End-to-end behavior of the /api/oauth/callback/ endpoint."""
def test_no_code(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- OAuth client IDs and secrets configured
WHEN:
- OAuth callback is called with an invalid state
- The OAuth callback is called without a code parameter
THEN:
- 400 bad request returned, no mail accounts are created
- 400 Bad Request is returned and no mail account is created
"""
response = client.get("/api/oauth/callback/")
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.filter(imap_server="imap.gmail.com").exists()
assert not MailAccount.objects.filter(
imap_server="outlook.office365.com",
).exists()
response = self.client.get(
def test_invalid_state(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured
WHEN:
- The OAuth callback is called with a state that does not match the session
THEN:
- 400 Bad Request is returned and no mail account is created
"""
response = client.get(
"/api/oauth/callback/?code=test_code&state=invalid_state",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.filter(imap_server="imap.gmail.com").exists()
assert not MailAccount.objects.filter(
imap_server="outlook.office365.com",
).exists()
@mock.patch("paperless_mail.mail.get_mailbox")
@mock.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
)
def test_refresh_token_on_handle_mail_account(
def test_insufficient_permissions(
self,
mock_refresh_token,
mock_get_mailbox,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- Mail account with refresh token and expiration
- OAuth client IDs and secrets configured
- User without add_mailaccount permission
WHEN:
- handle_mail_account is called
- The OAuth callback is called
THEN:
- Refresh token is called
- 400 Bad Request is returned and no mail account is created
"""
mock_mailbox = mock.MagicMock()
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
mail_account = MailAccountFactory(
name="Test Gmail Mail Account",
username="test_username",
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
is_token=True,
refresh_token="test_refresh_token",
expiration=timezone.now() - timedelta(days=1),
mail_user.user_permissions.remove(
*Permission.objects.filter(codename__in=["add_mailaccount"]),
)
mail_user.save()
mock_refresh_token.return_value = {
response = client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.filter(imap_server="imap.gmail.com").exists()
assert not MailAccount.objects.filter(
imap_server="outlook.office365.com",
).exists()
@pytest.mark.parametrize(
("provider", "callback_query", "expected_imap"),
[
pytest.param(
"gmail",
"code=test_code&scope=https://mail.google.com/&state=test_state",
"imap.gmail.com",
id="gmail",
),
pytest.param(
"outlook",
"code=test_code&state=test_state",
"outlook.office365.com",
id="outlook",
),
],
)
def test_success(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
oauth_session: Client,
mocker: pytest_mock.MockerFixture,
provider: str,
callback_query: str,
expected_imap: str,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured for Gmail and Outlook
- A valid oauth_state seeded in the session
WHEN:
- The OAuth callback is called with a code and provider-specific scope
THEN:
- A redirect with oauth_success=1 is returned
- The provider's access-token method is invoked
- A mail account for the matching provider is created
"""
token_payload = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
target = (
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_gmail_access_token"
if provider == "gmail"
else "paperless_mail.oauth.PaperlessMailOAuth2Manager.get_outlook_access_token"
)
mocked = mocker.patch(target, return_value=token_payload)
self.mail_account_handler.handle_mail_account(mail_account)
mock_refresh_token.assert_called_once()
mock_refresh_token.reset_mock()
response = client.get(f"/api/oauth/callback/?{callback_query}")
mock_refresh_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh",
"expires_in": 3600,
}
outlook_mail_account = MailAccountFactory(
name="Test Outlook Mail Account",
assert response.status_code == status.HTTP_302_FOUND
assert "oauth_success=1" in response.url
mocked.assert_called_once()
assert MailAccount.objects.filter(imap_server=expected_imap).exists()
@pytest.mark.parametrize(
("callback_query", "imap_server"),
[
pytest.param(
"code=test_code&scope=https://mail.google.com/&state=test_state",
"imap.gmail.com",
id="gmail",
),
pytest.param(
"code=test_code&state=test_state",
"outlook.office365.com",
id="outlook",
),
],
)
def test_provider_error(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
oauth_session: Client,
mocker: pytest_mock.MockerFixture,
caplog: pytest.LogCaptureFixture,
callback_query: str,
imap_server: str,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured
- The provider's access-token endpoint raises GetAccessTokenError
WHEN:
- The OAuth callback is called with a code (Gmail or Outlook)
THEN:
- A redirect with oauth_success=0 is returned
- No mail account is created
- The failure is logged at ERROR level
"""
mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.get_access_token",
side_effect=GetAccessTokenError("test_error"),
)
with caplog.at_level("ERROR", logger="paperless_mail"):
response = client.get(f"/api/oauth/callback/?{callback_query}")
assert response.status_code == status.HTTP_302_FOUND
assert "oauth_success=0" in response.url
assert not MailAccount.objects.filter(imap_server=imap_server).exists()
assert any(
"Error getting access token from OAuth provider" in record.message
for record in caplog.records
)
@pytest.mark.django_db
class TestRefreshTokenOnHandleMailAccount:
"""OAuth refresh-token flow exercised through MailAccountHandler.handle_mail_account."""
@pytest.mark.parametrize(
("account_type", "name"),
[
pytest.param(
MailAccount.MailAccountType.GMAIL_OAUTH,
"Test Gmail",
id="gmail",
),
pytest.param(
MailAccount.MailAccountType.OUTLOOK_OAUTH,
"Test Outlook",
id="outlook",
),
],
)
def test_refresh_token_called(
self,
mocker: pytest_mock.MockerFixture,
mail_account_handler: MailAccountHandler,
account_type: MailAccount.MailAccountType,
name: str,
) -> None:
"""
GIVEN:
- An OAuth-backed mail account with a refresh token and an expired access token
WHEN:
- handle_mail_account is called
THEN:
- The OAuth refresh_token endpoint is invoked exactly once
"""
mock_mailbox = mocker.MagicMock()
mocker.patch(
"paperless_mail.mail.get_mailbox",
).return_value.__enter__.return_value = mock_mailbox
mock_refresh = mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
return_value={
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
},
)
account = MailAccountFactory(
name=name,
username="test_username",
account_type=MailAccount.MailAccountType.OUTLOOK_OAUTH,
account_type=account_type,
is_token=True,
refresh_token="test_refresh_token",
expiration=timezone.now() - timedelta(days=1),
)
self.mail_account_handler.handle_mail_account(outlook_mail_account)
mock_refresh_token.assert_called_once()
mail_account_handler.handle_mail_account(account)
mock_refresh.assert_called_once()
@mock.patch("paperless_mail.mail.get_mailbox")
@mock.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
)
def test_refresh_token_on_handle_mail_account_fails(
def test_refresh_token_failure(
self,
mock_refresh_token,
mock_get_mailbox,
mocker: pytest_mock.MockerFixture,
caplog: pytest.LogCaptureFixture,
mail_account_handler: MailAccountHandler,
) -> None:
"""
GIVEN:
- Mail account with refresh token and expiration
- An OAuth-backed mail account with a refresh token and an expired access token
- The OAuth refresh_token endpoint raises RefreshTokenError
WHEN:
- handle_mail_account is called
- Refresh token is called but fails
THEN:
- Error is logged
- 0 processed mails is returned
- The failure is logged at ERROR level with the account context
"""
mock_mailbox = mocker.MagicMock()
mocker.patch(
"paperless_mail.mail.get_mailbox",
).return_value.__enter__.return_value = mock_mailbox
mock_refresh = mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
side_effect=RefreshTokenError("test_error"),
)
mock_mailbox = mock.MagicMock()
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
mail_account = MailAccountFactory(
account = MailAccountFactory(
name="Test Gmail Mail Account",
username="test_username",
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
@@ -358,16 +393,13 @@ class TestMailOAuth(
expiration=timezone.now() - timedelta(days=1),
)
mock_refresh_token.side_effect = RefreshTokenError("test_error")
with caplog.at_level("ERROR", logger="paperless_mail"):
result = mail_account_handler.handle_mail_account(account)
with self.assertLogs("paperless_mail", level="ERROR") as cm:
# returns 0 processed mails
self.assertEqual(
self.mail_account_handler.handle_mail_account(mail_account),
0,
)
mock_refresh_token.assert_called_once()
self.assertIn(
f"Failed to refresh oauth token for account {mail_account}: test_error",
cm.output[0],
)
assert result == 0
mock_refresh.assert_called_once()
assert any(
f"Failed to refresh oauth token for account {account}: test_error"
in record.message
for record in caplog.records
)
+154 -126
View File
@@ -1,28 +1,31 @@
import email
import email.contentmanager
import shutil
import subprocess
import tempfile
from collections.abc import Generator
from email.message import Message
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from unittest import mock
from pathlib import Path
import gnupg
from django.test import override_settings
import pytest
from imap_tools import MailMessage
from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailRule
from paperless_mail.preprocessor import MailMessageDecryptor
from paperless_mail.tests.factories import MailAccountFactory
from paperless_mail.tests.test_mail import TestMail
from paperless_mail.tests.test_mail import _AttachmentDef
class MessageEncryptor:
def __init__(self) -> None:
self.gpg_home = tempfile.mkdtemp()
"""
Test helper: generates a throwaway GPG keypair in a tempdir and exposes
`encrypt(MailMessage) -> MailMessage`.
"""
def __init__(self, gpg_home: Path) -> None:
self.gpg_home = str(gpg_home)
self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
self._testUser = "testuser@example.com"
# Generate a new key
@@ -36,9 +39,9 @@ class MessageEncryptor:
)
self.gpg.gen_key(input_data)
def cleanup(self) -> None:
def kill_agent(self) -> None:
"""
Kill the gpg-agent process and clean up the temporary GPG home directory.
Kill the gpg-agent so pytest can remove the GPG home.
This uses gpgconf to properly terminate the agent, which is the officially
recommended cleanup method from the GnuPG project. python-gnupg does not
@@ -57,9 +60,6 @@ class MessageEncryptor:
# gpgconf not found or hung - agent will timeout eventually
pass
# Clean up the temporary directory
shutil.rmtree(self.gpg_home, ignore_errors=True)
@staticmethod
def get_email_body_without_headers(email_message: Message) -> bytes:
"""
@@ -74,7 +74,7 @@ class MessageEncryptor:
]
return message_copy.as_bytes()
def encrypt(self, message):
def encrypt(self, message) -> MailMessage:
original_email: email.message.Message = message.obj
encrypted_data = self.gpg.encrypt(
self.get_email_body_without_headers(original_email),
@@ -104,152 +104,181 @@ class MessageEncryptor:
)
new_email.attach(encrypted_part)
encrypted_message: MailMessage = MailMessage(
return MailMessage(
[(f"UID {message.uid}".encode(), new_email.as_bytes())],
)
return encrypted_message
class TestMailMessageGpgDecryptor(TestMail):
@classmethod
def setUpClass(cls) -> None:
"""Create GPG encryptor once for all tests in this class."""
super().setUpClass()
cls.messageEncryptor = MessageEncryptor()
@pytest.fixture(scope="session")
def message_encryptor(
tmp_path_factory: pytest.TempPathFactory,
) -> Generator[MessageEncryptor, None, None]:
"""
Session-scoped: GPG keypair generation is slow (~1s+), and nothing in
these tests mutates the keyring after creation. The GPG home directory
comes from `tmp_path_factory` so pytest cleans it up at session end;
we still kill the gpg-agent ourselves so the dir is removable.
"""
gpg_home = tmp_path_factory.mktemp("gpg-home")
encryptor = MessageEncryptor(gpg_home)
yield encryptor
encryptor.kill_agent()
@classmethod
def tearDownClass(cls) -> None:
"""Clean up GPG resources after all tests complete."""
if hasattr(cls, "messageEncryptor"):
cls.messageEncryptor.cleanup()
super().tearDownClass()
def setUp(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
super().setUp()
@pytest.fixture()
def gpg_settings(settings, message_encryptor: MessageEncryptor):
settings.EMAIL_GNUPG_HOME = message_encryptor.gpg_home
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
return settings
def test_preprocessor_is_able_to_run(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
def test_preprocessor_is_able_to_run2(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=None,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
@pytest.fixture()
def encrypted_pair(mail_mocker, message_encryptor: MessageEncryptor):
"""
Build a (encrypted, plaintext) MailMessage pair sharing the same UID and
headers, with two PDF attachments on the plaintext side.
"""
plaintext = mail_mocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(filename="f1.pdf", disposition="inline"),
_AttachmentDef(filename="f2.pdf"),
],
)
encrypted = message_encryptor.encrypt(plaintext)
return encrypted, plaintext
def test_is_not_able_to_run_disabled(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=False,
):
self.assertFalse(MailMessageDecryptor.able_to_run())
def test_is_not_able_to_run_bogus_path(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME="_)@# notapath &%#$",
):
self.assertFalse(MailMessageDecryptor.able_to_run())
class TestMailMessageDecryptorAbleToRun:
"""`MailMessageDecryptor.able_to_run()` configuration matrix — no DB needed."""
def test_fails_at_initialization(self) -> None:
with (
mock.patch("gnupg.GPG.__init__") as mock_run,
override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
@pytest.mark.parametrize(
("settings_overrides", "expected"),
[
pytest.param(
{
"EMAIL_GNUPG_HOME": "_gpg_home_marker",
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
},
True,
id="enabled-with-valid-home",
),
):
pytest.param(
{"EMAIL_GNUPG_HOME": None, "EMAIL_ENABLE_GPG_DECRYPTOR": True},
True,
id="enabled-with-default-home",
),
pytest.param(
{"EMAIL_ENABLE_GPG_DECRYPTOR": False},
False,
id="disabled",
),
pytest.param(
{
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
"EMAIL_GNUPG_HOME": "_)@# notapath &%#$",
},
False,
id="enabled-with-bogus-path",
),
],
)
def test_able_to_run(
self,
settings,
message_encryptor: MessageEncryptor,
settings_overrides: dict,
*,
expected: bool,
) -> None:
for key, value in settings_overrides.items():
if value == "_gpg_home_marker":
value = message_encryptor.gpg_home
setattr(settings, key, value)
assert MailMessageDecryptor.able_to_run() is expected
def side_effect(*args, **kwargs):
raise OSError("Cannot find 'gpg' binary")
mock_run.side_effect = side_effect
@pytest.mark.django_db
class TestMailMessageDecryptor:
"""End-to-end decrypt and consumption flow with a real GPG keyring."""
handler = MailAccountHandler()
self.assertEqual(len(handler._message_preprocessors), 0)
def test_fails_at_initialization(self, settings, mocker) -> None:
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
mocker.patch(
"gnupg.GPG.__init__",
side_effect=OSError("Cannot find 'gpg' binary"),
)
def test_decrypt_fails(self) -> None:
encrypted_message, _ = self.create_encrypted_unencrypted_message_pair()
# This test creates its own empty GPG home to test decryption failure
empty_gpg_home = tempfile.mkdtemp()
handler = MailAccountHandler()
assert len(handler._message_preprocessors) == 0
def test_decrypt_fails(self, settings, encrypted_pair, tmp_path: Path) -> None:
"""
A decryptor pointed at a fresh empty GPG home cannot decrypt the
message — ensure it surfaces an exception rather than silently passing
bytes through.
"""
encrypted_message, _ = encrypted_pair
empty_gpg_home = tmp_path / "empty-gpg-home"
empty_gpg_home.mkdir()
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
settings.EMAIL_GNUPG_HOME = str(empty_gpg_home)
decryptor = MailMessageDecryptor()
try:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME=empty_gpg_home,
):
message_decryptor = MailMessageDecryptor()
self.assertRaises(Exception, message_decryptor.run, encrypted_message)
with pytest.raises(Exception):
decryptor.run(encrypted_message)
finally:
# Clean up the temporary GPG home used only by this test
try:
subprocess.run(
["gpgconf", "--kill", "gpg-agent"],
env={"GNUPGHOME": empty_gpg_home},
env={"GNUPGHOME": str(empty_gpg_home)},
check=False,
capture_output=True,
timeout=5,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
shutil.rmtree(empty_gpg_home, ignore_errors=True)
def test_decrypt_encrypted_mail(self) -> None:
def test_decrypt_encrypted_mail(self, gpg_settings, encrypted_pair) -> None:
"""
Creates a mail with attachments. Then encrypts it with a new key.
Verifies that this encrypted message can be decrypted with attachments intact.
"""
encrypted_message, message = self.create_encrypted_unencrypted_message_pair()
headers = message.headers
text = message.text
encrypted_message, plaintext = encrypted_pair
headers = plaintext.headers
text = plaintext.text
self.assertEqual(len(encrypted_message.attachments), 1)
self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc")
self.assertEqual(encrypted_message.text, "")
assert len(encrypted_message.attachments) == 1
assert encrypted_message.attachments[0].filename == "encrypted.asc"
assert encrypted_message.text == ""
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
):
message_decryptor = MailMessageDecryptor()
self.assertTrue(message_decryptor.able_to_run())
decrypted_message = message_decryptor.run(encrypted_message)
decryptor = MailMessageDecryptor()
assert decryptor.able_to_run()
decrypted = decryptor.run(encrypted_message)
self.assertEqual(len(decrypted_message.attachments), 2)
self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf")
self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf")
self.assertEqual(decrypted_message.headers, headers)
self.assertEqual(decrypted_message.text, text)
self.assertEqual(decrypted_message.uid, message.uid)
assert len(decrypted.attachments) == 2
assert decrypted.attachments[0].filename == "f1.pdf"
assert decrypted.attachments[1].filename == "f2.pdf"
assert decrypted.headers == headers
assert decrypted.text == text
assert decrypted.uid == plaintext.uid
def create_encrypted_unencrypted_message_pair(self):
message = self.mailMocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(
filename="f1.pdf",
disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
encrypted_message = self.messageEncryptor.encrypt(message)
return encrypted_message, message
def test_handle_encrypted_message(self) -> None:
message = self.mailMocker.messageBuilder.create_message(
def test_handle_encrypted_message(
self,
gpg_settings,
mail_mocker,
message_encryptor: MessageEncryptor,
) -> None:
plaintext = mail_mocker.messageBuilder.create_message(
subject="the message title",
from_="Myself",
attachments=2,
body="Test mail",
)
encrypted_message = self.messageEncryptor.encrypt(message)
encrypted = message_encryptor.encrypt(plaintext)
account = MailAccountFactory()
rule = MailRule(
@@ -259,18 +288,17 @@ class TestMailMessageGpgDecryptor(TestMail):
)
rule.save()
result = self.mail_account_handler._handle_message(encrypted_message, rule)
handler = MailAccountHandler()
result = handler._handle_message(encrypted, rule)
self.assertEqual(result, 3)
self.mailMocker._queue_consumption_tasks_mock.assert_called()
self.mailMocker.assert_queue_consumption_tasks_call_args(
assert result == 3
mail_mocker._queue_consumption_tasks_mock.assert_called()
mail_mocker.assert_queue_consumption_tasks_call_args(
[
[
{
"override_title": message.subject,
"override_filename": f"{message.subject}.eml",
"override_title": plaintext.subject,
"override_filename": f"{plaintext.subject}.eml",
},
],
[