mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-05-07 23:25:25 +00:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0d3f2ffb75 | |||
| 4942ab1000 | |||
| 9e17c55a9a | |||
| 42da2e2fd6 | |||
| 4872bfbeae | |||
| e40cd6048d | |||
| 56ebc08a65 | |||
| b7d466c242 |
@@ -1,11 +1,19 @@
|
||||
from collections.abc import Generator
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
import pytest_mock
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import Client
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
from paperless_mail.models import MailAccount
|
||||
from paperless_mail.tests.factories import MailAccountFactory
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from paperless_mail.tests.test_mail import BogusMailBox
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
|
||||
@@ -27,3 +35,103 @@ def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
|
||||
@pytest.fixture()
|
||||
def mail_account_handler() -> MailAccountHandler:
|
||||
return MailAccountHandler()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mail_user(
|
||||
db: None,
|
||||
django_user_model,
|
||||
client: Client,
|
||||
):
|
||||
"""
|
||||
Create a user with the `add_mailaccount` permission and log them in via
|
||||
the test client. Returned so tests can mutate permissions if needed.
|
||||
"""
|
||||
from django.contrib.auth.models import Permission
|
||||
|
||||
user = django_user_model.objects.create_user("testuser")
|
||||
user.user_permissions.add(
|
||||
*Permission.objects.filter(codename__in=["add_mailaccount"]),
|
||||
)
|
||||
user.save()
|
||||
client.force_login(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def oauth_settings(settings):
|
||||
"""
|
||||
Apply the OAuth callback / client-id settings the OAuth flow needs. Uses
|
||||
pytest-django's `settings` fixture so values are reverted automatically.
|
||||
"""
|
||||
settings.OAUTH_CALLBACK_BASE_URL = "http://localhost:8000"
|
||||
settings.GMAIL_OAUTH_CLIENT_ID = "test_gmail_client_id"
|
||||
settings.GMAIL_OAUTH_CLIENT_SECRET = "test_gmail_client_secret"
|
||||
settings.OUTLOOK_OAUTH_CLIENT_ID = "test_outlook_client_id"
|
||||
settings.OUTLOOK_OAUTH_CLIENT_SECRET = "test_outlook_client_secret"
|
||||
return settings
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mail_mocker(db: None):
|
||||
"""
|
||||
Provides a MailMocker instance with its `MailBox` and
|
||||
`queue_consumption_tasks` patches active. Cleanups registered via
|
||||
TestCase.addCleanup are run on teardown by calling doCleanups().
|
||||
"""
|
||||
from paperless_mail.tests.test_mail import MailMocker
|
||||
|
||||
mocker = MailMocker()
|
||||
mocker.setUp()
|
||||
try:
|
||||
yield mocker
|
||||
finally:
|
||||
mocker.doCleanups()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mail_api_user(
|
||||
db: None,
|
||||
django_user_model: type[User],
|
||||
) -> User:
|
||||
"""
|
||||
Fully-permissioned (regular) user used by the mail API tests.
|
||||
|
||||
Has every model-level permission but is NOT a Django superuser/staff:
|
||||
the owner-aware filtering and bulk_delete permission tests rely on
|
||||
django-guardian's object-level checks, and `is_superuser` short-circuits
|
||||
those checks. The name avoids `admin` to make this distinction explicit.
|
||||
"""
|
||||
from django.contrib.auth.models import Permission
|
||||
|
||||
user = django_user_model.objects.create_user(username="mail_api_user")
|
||||
user.user_permissions.add(*Permission.objects.all())
|
||||
user.save()
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mail_api_client(mail_api_user: User) -> APIClient:
|
||||
"""
|
||||
DRF APIClient force-authenticated as `mail_api_user` and pinned to API v10
|
||||
via the Accept header (matches `documents/tests/conftest.py:admin_client`).
|
||||
"""
|
||||
client = APIClient()
|
||||
client.force_authenticate(user=mail_api_user)
|
||||
client.credentials(HTTP_ACCEPT="application/json; version=10")
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def bogus_mailbox(mocker: pytest_mock.MockerFixture) -> "BogusMailBox":
|
||||
"""
|
||||
Patch `paperless_mail.mail.MailBox` with a `BogusMailBox` instance so the
|
||||
`/api/mail_accounts/test/` endpoint can run without a real IMAP server.
|
||||
Returns the bogus mailbox so tests can introspect/manipulate it.
|
||||
"""
|
||||
from paperless_mail.tests.test_mail import BogusMailBox
|
||||
|
||||
mailbox = BogusMailBox()
|
||||
mock_mailbox_cls = mocker.patch("paperless_mail.mail.MailBox")
|
||||
mock_mailbox_cls.return_value = mailbox
|
||||
return mailbox
|
||||
|
||||
@@ -1,44 +1,39 @@
|
||||
import json
|
||||
from unittest import mock
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from django.contrib.auth.models import Permission
|
||||
import pytest
|
||||
from django.contrib.auth.models import User
|
||||
from guardian.shortcuts import assign_perm
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APITestCase
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from documents.tests.factories import CorrespondentFactory
|
||||
from documents.tests.factories import DocumentTypeFactory
|
||||
from documents.tests.factories import TagFactory
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from paperless_mail.models import MailAccount
|
||||
from paperless_mail.models import MailRule
|
||||
from paperless_mail.models import ProcessedMail
|
||||
from paperless_mail.tests.factories import MailAccountFactory
|
||||
from paperless_mail.tests.factories import MailRuleFactory
|
||||
from paperless_mail.tests.factories import ProcessedMailFactory
|
||||
from paperless_mail.tests.test_mail import BogusMailBox
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from paperless_mail.tests.test_mail import BogusMailBox
|
||||
|
||||
|
||||
class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/mail_accounts/"
|
||||
MAIL_ACCOUNTS_ENDPOINT = "/api/mail_accounts/"
|
||||
MAIL_ACCOUNTS_TEST_ENDPOINT = f"{MAIL_ACCOUNTS_ENDPOINT}test/"
|
||||
MAIL_RULES_ENDPOINT = "/api/mail_rules/"
|
||||
PROCESSED_MAIL_ENDPOINT = "/api/processed_mail/"
|
||||
PROCESSED_MAIL_BULK_DELETE_ENDPOINT = f"{PROCESSED_MAIL_ENDPOINT}bulk_delete/"
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.bogus_mailbox = BogusMailBox()
|
||||
|
||||
patcher = mock.patch("paperless_mail.mail.MailBox")
|
||||
m = patcher.start()
|
||||
m.return_value = self.bogus_mailbox
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
super().setUp()
|
||||
|
||||
self.user = User.objects.create_user(username="temp_admin")
|
||||
self.user.user_permissions.add(*Permission.objects.all())
|
||||
self.user.save()
|
||||
self.client.force_authenticate(user=self.user)
|
||||
|
||||
def test_get_mail_accounts(self) -> None:
|
||||
@pytest.mark.django_db
|
||||
class TestAPIMailAccounts:
|
||||
def test_get_mail_accounts(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Configured mail accounts
|
||||
@@ -47,7 +42,6 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- Configured mail accounts are provided
|
||||
"""
|
||||
|
||||
account1 = MailAccountFactory(
|
||||
name="Email1",
|
||||
username="username1",
|
||||
@@ -56,31 +50,30 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
imap_port=443,
|
||||
)
|
||||
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
response = mail_api_client.get(MAIL_ACCOUNTS_ENDPOINT)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["count"], 1)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["count"] == 1
|
||||
returned_account1 = response.data["results"][0]
|
||||
|
||||
self.assertEqual(returned_account1["name"], account1.name)
|
||||
self.assertEqual(returned_account1["username"], account1.username)
|
||||
self.assertEqual(
|
||||
returned_account1["password"],
|
||||
"**********",
|
||||
)
|
||||
self.assertEqual(returned_account1["imap_server"], account1.imap_server)
|
||||
self.assertEqual(returned_account1["imap_port"], account1.imap_port)
|
||||
self.assertEqual(returned_account1["imap_security"], account1.imap_security)
|
||||
self.assertEqual(returned_account1["character_set"], account1.character_set)
|
||||
assert returned_account1["name"] == account1.name
|
||||
assert returned_account1["username"] == account1.username
|
||||
assert returned_account1["password"] == "**********"
|
||||
assert returned_account1["imap_server"] == account1.imap_server
|
||||
assert returned_account1["imap_port"] == account1.imap_port
|
||||
assert returned_account1["imap_security"] == account1.imap_security
|
||||
assert returned_account1["character_set"] == account1.character_set
|
||||
|
||||
def test_create_mail_account(self) -> None:
|
||||
def test_create_mail_account(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
WHEN:
|
||||
- API request is made to add a mail account
|
||||
THEN:
|
||||
- A new mail account is created
|
||||
"""
|
||||
|
||||
account1 = {
|
||||
"name": "Email1",
|
||||
"username": "username1",
|
||||
@@ -91,24 +84,27 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
"character_set": "UTF-8",
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
response = mail_api_client.post(
|
||||
MAIL_ACCOUNTS_ENDPOINT,
|
||||
data=account1,
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
|
||||
returned_account1 = MailAccount.objects.get(name="Email1")
|
||||
|
||||
self.assertEqual(returned_account1.name, account1["name"])
|
||||
self.assertEqual(returned_account1.username, account1["username"])
|
||||
self.assertEqual(returned_account1.password, account1["password"])
|
||||
self.assertEqual(returned_account1.imap_server, account1["imap_server"])
|
||||
self.assertEqual(returned_account1.imap_port, account1["imap_port"])
|
||||
self.assertEqual(returned_account1.imap_security, account1["imap_security"])
|
||||
self.assertEqual(returned_account1.character_set, account1["character_set"])
|
||||
assert returned_account1.name == account1["name"]
|
||||
assert returned_account1.username == account1["username"]
|
||||
assert returned_account1.password == account1["password"]
|
||||
assert returned_account1.imap_server == account1["imap_server"]
|
||||
assert returned_account1.imap_port == account1["imap_port"]
|
||||
assert returned_account1.imap_security == account1["imap_security"]
|
||||
assert returned_account1.character_set == account1["character_set"]
|
||||
|
||||
def test_delete_mail_account(self) -> None:
|
||||
def test_delete_mail_account(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing mail account
|
||||
@@ -117,18 +113,20 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- Account is deleted
|
||||
"""
|
||||
|
||||
account1 = MailAccountFactory()
|
||||
|
||||
response = self.client.delete(
|
||||
f"{self.ENDPOINT}{account1.pk}/",
|
||||
response = mail_api_client.delete(
|
||||
f"{MAIL_ACCOUNTS_ENDPOINT}{account1.pk}/",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
|
||||
self.assertEqual(len(MailAccount.objects.all()), 0)
|
||||
assert MailAccount.objects.count() == 0
|
||||
|
||||
def test_update_mail_account(self) -> None:
|
||||
def test_update_mail_account(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing mail accounts
|
||||
@@ -137,49 +135,51 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- The mail account is updated, password only updated if not '****'
|
||||
"""
|
||||
|
||||
account1 = MailAccountFactory()
|
||||
|
||||
response = self.client.patch(
|
||||
f"{self.ENDPOINT}{account1.pk}/",
|
||||
response = mail_api_client.patch(
|
||||
f"{MAIL_ACCOUNTS_ENDPOINT}{account1.pk}/",
|
||||
data={
|
||||
"name": "Updated Name 1",
|
||||
"password": "******",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
|
||||
returned_account1 = MailAccount.objects.get(pk=account1.pk)
|
||||
self.assertEqual(returned_account1.name, "Updated Name 1")
|
||||
self.assertEqual(returned_account1.password, account1.password)
|
||||
assert returned_account1.name == "Updated Name 1"
|
||||
assert returned_account1.password == account1.password
|
||||
|
||||
response = self.client.patch(
|
||||
f"{self.ENDPOINT}{account1.pk}/",
|
||||
response = mail_api_client.patch(
|
||||
f"{MAIL_ACCOUNTS_ENDPOINT}{account1.pk}/",
|
||||
data={
|
||||
"name": "Updated Name 2",
|
||||
"password": "123xyz",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
|
||||
returned_account2 = MailAccount.objects.get(pk=account1.pk)
|
||||
self.assertEqual(returned_account2.name, "Updated Name 2")
|
||||
self.assertEqual(returned_account2.password, "123xyz")
|
||||
assert returned_account2.name == "Updated Name 2"
|
||||
assert returned_account2.password == "123xyz"
|
||||
|
||||
def test_mail_account_test_fail(self) -> None:
|
||||
def test_mail_account_test_fail(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
bogus_mailbox: "BogusMailBox",
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Errnoeous mail account details
|
||||
- Erroneous mail account details
|
||||
WHEN:
|
||||
- API call is made to test account
|
||||
THEN:
|
||||
- API returns 400 bad request
|
||||
"""
|
||||
|
||||
response = self.client.post(
|
||||
f"{self.ENDPOINT}test/",
|
||||
response = mail_api_client.post(
|
||||
MAIL_ACCOUNTS_TEST_ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"imap_server": "server.example.com",
|
||||
@@ -192,9 +192,13 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
|
||||
def test_mail_account_test_success(self) -> None:
|
||||
def test_mail_account_test_success(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
bogus_mailbox: "BogusMailBox",
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Working mail account details
|
||||
@@ -203,9 +207,8 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- API returns success
|
||||
"""
|
||||
|
||||
response = self.client.post(
|
||||
f"{self.ENDPOINT}test/",
|
||||
response = mail_api_client.post(
|
||||
MAIL_ACCOUNTS_TEST_ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"imap_server": "server.example.com",
|
||||
@@ -217,10 +220,14 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["success"], True)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["success"] is True
|
||||
|
||||
def test_mail_account_test_existing(self) -> None:
|
||||
def test_mail_account_test_existing(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
bogus_mailbox: "BogusMailBox",
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Testing server details for an existing account with obfuscated password (***)
|
||||
@@ -236,8 +243,8 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
imap_port=443,
|
||||
)
|
||||
|
||||
response = self.client.post(
|
||||
f"{self.ENDPOINT}test/",
|
||||
response = mail_api_client.post(
|
||||
MAIL_ACCOUNTS_TEST_ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"id": account.pk,
|
||||
@@ -250,12 +257,16 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["success"], True)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["success"] is True
|
||||
|
||||
def test_mail_account_test_existing_nonexistent_id_forbidden(self) -> None:
|
||||
response = self.client.post(
|
||||
f"{self.ENDPOINT}test/",
|
||||
def test_mail_account_test_existing_nonexistent_id_forbidden(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
bogus_mailbox: "BogusMailBox",
|
||||
) -> None:
|
||||
response = mail_api_client.post(
|
||||
MAIL_ACCOUNTS_TEST_ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"id": 999999,
|
||||
@@ -268,10 +279,15 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
self.assertEqual(response.content.decode(), "Insufficient permissions")
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.content.decode() == "Insufficient permissions"
|
||||
|
||||
def test_get_mail_accounts_owner_aware(self) -> None:
|
||||
def test_get_mail_accounts_owner_aware(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
mail_api_user: User,
|
||||
django_user_model: type[User],
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Configured accounts with different users
|
||||
@@ -280,36 +296,29 @@ class TestAPIMailAccounts(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- Only unowned, owned by user or granted accounts are provided
|
||||
"""
|
||||
|
||||
user2 = User.objects.create_user(username="temp_admin2")
|
||||
user2 = django_user_model.objects.create_user(username="temp_admin2")
|
||||
|
||||
account1 = MailAccountFactory(name="Email1")
|
||||
account2 = MailAccountFactory(name="Email2", owner=self.user)
|
||||
account2 = MailAccountFactory(name="Email2", owner=mail_api_user)
|
||||
_account3 = MailAccountFactory(name="Email3", owner=user2)
|
||||
account4 = MailAccountFactory(name="Email4", owner=user2)
|
||||
assign_perm("view_mailaccount", self.user, account4)
|
||||
assign_perm("view_mailaccount", mail_api_user, account4)
|
||||
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
response = mail_api_client.get(MAIL_ACCOUNTS_ENDPOINT)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["count"], 3)
|
||||
self.assertEqual(response.data["results"][0]["name"], account1.name)
|
||||
self.assertEqual(response.data["results"][1]["name"], account2.name)
|
||||
self.assertEqual(response.data["results"][2]["name"], account4.name)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["count"] == 3
|
||||
assert response.data["results"][0]["name"] == account1.name
|
||||
assert response.data["results"][1]["name"] == account2.name
|
||||
assert response.data["results"][2]["name"] == account4.name
|
||||
|
||||
|
||||
class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/mail_rules/"
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
|
||||
self.user = User.objects.create_user(username="temp_admin")
|
||||
self.user.user_permissions.add(*Permission.objects.all())
|
||||
self.user.save()
|
||||
self.client.force_authenticate(user=self.user)
|
||||
|
||||
def test_get_mail_rules(self) -> None:
|
||||
@pytest.mark.django_db
|
||||
class TestAPIMailRules:
|
||||
def test_get_mail_rules(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Configured mail accounts and rules
|
||||
@@ -318,7 +327,6 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- Configured mail rules are provided
|
||||
"""
|
||||
|
||||
account1 = MailAccountFactory()
|
||||
rule1 = MailRuleFactory(
|
||||
name="Rule1",
|
||||
@@ -330,34 +338,37 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
filter_attachment_filename_include="file.pdf",
|
||||
)
|
||||
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
response = mail_api_client.get(MAIL_RULES_ENDPOINT)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["count"], 1)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["count"] == 1
|
||||
returned_rule1 = response.data["results"][0]
|
||||
|
||||
self.assertEqual(returned_rule1["name"], rule1.name)
|
||||
self.assertEqual(returned_rule1["account"], account1.pk)
|
||||
self.assertEqual(returned_rule1["folder"], rule1.folder)
|
||||
self.assertEqual(returned_rule1["filter_from"], rule1.filter_from)
|
||||
self.assertEqual(returned_rule1["filter_to"], rule1.filter_to)
|
||||
self.assertEqual(returned_rule1["filter_subject"], rule1.filter_subject)
|
||||
self.assertEqual(returned_rule1["filter_body"], rule1.filter_body)
|
||||
self.assertEqual(
|
||||
returned_rule1["filter_attachment_filename_include"],
|
||||
rule1.filter_attachment_filename_include,
|
||||
assert returned_rule1["name"] == rule1.name
|
||||
assert returned_rule1["account"] == account1.pk
|
||||
assert returned_rule1["folder"] == rule1.folder
|
||||
assert returned_rule1["filter_from"] == rule1.filter_from
|
||||
assert returned_rule1["filter_to"] == rule1.filter_to
|
||||
assert returned_rule1["filter_subject"] == rule1.filter_subject
|
||||
assert returned_rule1["filter_body"] == rule1.filter_body
|
||||
assert (
|
||||
returned_rule1["filter_attachment_filename_include"]
|
||||
== rule1.filter_attachment_filename_include
|
||||
)
|
||||
self.assertEqual(returned_rule1["maximum_age"], rule1.maximum_age)
|
||||
self.assertEqual(returned_rule1["action"], rule1.action)
|
||||
self.assertEqual(returned_rule1["assign_title_from"], rule1.assign_title_from)
|
||||
self.assertEqual(
|
||||
returned_rule1["assign_correspondent_from"],
|
||||
rule1.assign_correspondent_from,
|
||||
assert returned_rule1["maximum_age"] == rule1.maximum_age
|
||||
assert returned_rule1["action"] == rule1.action
|
||||
assert returned_rule1["assign_title_from"] == rule1.assign_title_from
|
||||
assert (
|
||||
returned_rule1["assign_correspondent_from"]
|
||||
== rule1.assign_correspondent_from
|
||||
)
|
||||
self.assertEqual(returned_rule1["order"], rule1.order)
|
||||
self.assertEqual(returned_rule1["attachment_type"], rule1.attachment_type)
|
||||
assert returned_rule1["order"] == rule1.order
|
||||
assert returned_rule1["attachment_type"] == rule1.attachment_type
|
||||
|
||||
def test_create_mail_rule(self) -> None:
|
||||
def test_create_mail_rule(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Configured mail account exists
|
||||
@@ -366,7 +377,6 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- A new mail rule is created
|
||||
"""
|
||||
|
||||
account1 = MailAccountFactory()
|
||||
tag = TagFactory(name="t")
|
||||
correspondent = CorrespondentFactory(name="c")
|
||||
@@ -394,58 +404,51 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
"assign_owner_from_rule": True,
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
response = mail_api_client.post(
|
||||
MAIL_RULES_ENDPOINT,
|
||||
data=rule1,
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
response = mail_api_client.get(MAIL_RULES_ENDPOINT)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["count"], 1)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["count"] == 1
|
||||
returned_rule1 = response.data["results"][0]
|
||||
|
||||
self.assertEqual(returned_rule1["name"], rule1["name"])
|
||||
self.assertEqual(returned_rule1["account"], account1.pk)
|
||||
self.assertEqual(returned_rule1["folder"], rule1["folder"])
|
||||
self.assertEqual(returned_rule1["filter_from"], rule1["filter_from"])
|
||||
self.assertEqual(returned_rule1["filter_to"], rule1["filter_to"])
|
||||
self.assertEqual(returned_rule1["filter_subject"], rule1["filter_subject"])
|
||||
self.assertEqual(returned_rule1["filter_body"], rule1["filter_body"])
|
||||
self.assertEqual(
|
||||
returned_rule1["filter_attachment_filename_include"],
|
||||
rule1["filter_attachment_filename_include"],
|
||||
assert returned_rule1["name"] == rule1["name"]
|
||||
assert returned_rule1["account"] == account1.pk
|
||||
assert returned_rule1["folder"] == rule1["folder"]
|
||||
assert returned_rule1["filter_from"] == rule1["filter_from"]
|
||||
assert returned_rule1["filter_to"] == rule1["filter_to"]
|
||||
assert returned_rule1["filter_subject"] == rule1["filter_subject"]
|
||||
assert returned_rule1["filter_body"] == rule1["filter_body"]
|
||||
assert (
|
||||
returned_rule1["filter_attachment_filename_include"]
|
||||
== rule1["filter_attachment_filename_include"]
|
||||
)
|
||||
self.assertEqual(returned_rule1["maximum_age"], rule1["maximum_age"])
|
||||
self.assertEqual(returned_rule1["action"], rule1["action"])
|
||||
self.assertEqual(
|
||||
returned_rule1["assign_title_from"],
|
||||
rule1["assign_title_from"],
|
||||
assert returned_rule1["maximum_age"] == rule1["maximum_age"]
|
||||
assert returned_rule1["action"] == rule1["action"]
|
||||
assert returned_rule1["assign_title_from"] == rule1["assign_title_from"]
|
||||
assert (
|
||||
returned_rule1["assign_correspondent_from"]
|
||||
== rule1["assign_correspondent_from"]
|
||||
)
|
||||
self.assertEqual(
|
||||
returned_rule1["assign_correspondent_from"],
|
||||
rule1["assign_correspondent_from"],
|
||||
)
|
||||
self.assertEqual(returned_rule1["order"], rule1["order"])
|
||||
self.assertEqual(returned_rule1["attachment_type"], rule1["attachment_type"])
|
||||
self.assertEqual(returned_rule1["action_parameter"], rule1["action_parameter"])
|
||||
self.assertEqual(
|
||||
returned_rule1["assign_correspondent"],
|
||||
rule1["assign_correspondent"],
|
||||
)
|
||||
self.assertEqual(
|
||||
returned_rule1["assign_document_type"],
|
||||
rule1["assign_document_type"],
|
||||
)
|
||||
self.assertEqual(returned_rule1["assign_tags"], rule1["assign_tags"])
|
||||
self.assertEqual(
|
||||
returned_rule1["assign_owner_from_rule"],
|
||||
rule1["assign_owner_from_rule"],
|
||||
assert returned_rule1["order"] == rule1["order"]
|
||||
assert returned_rule1["attachment_type"] == rule1["attachment_type"]
|
||||
assert returned_rule1["action_parameter"] == rule1["action_parameter"]
|
||||
assert returned_rule1["assign_correspondent"] == rule1["assign_correspondent"]
|
||||
assert returned_rule1["assign_document_type"] == rule1["assign_document_type"]
|
||||
assert returned_rule1["assign_tags"] == rule1["assign_tags"]
|
||||
assert (
|
||||
returned_rule1["assign_owner_from_rule"] == rule1["assign_owner_from_rule"]
|
||||
)
|
||||
|
||||
def test_delete_mail_rule(self) -> None:
|
||||
def test_delete_mail_rule(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing mail rule
|
||||
@@ -454,19 +457,21 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- Rule is deleted
|
||||
"""
|
||||
|
||||
account1 = MailAccountFactory()
|
||||
rule1 = MailRuleFactory(account=account1)
|
||||
|
||||
response = self.client.delete(
|
||||
f"{self.ENDPOINT}{rule1.pk}/",
|
||||
response = mail_api_client.delete(
|
||||
f"{MAIL_RULES_ENDPOINT}{rule1.pk}/",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
|
||||
self.assertEqual(len(MailRule.objects.all()), 0)
|
||||
assert MailRule.objects.count() == 0
|
||||
|
||||
def test_update_mail_rule(self) -> None:
|
||||
def test_update_mail_rule(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing mail rule
|
||||
@@ -475,30 +480,33 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- The mail rule is updated
|
||||
"""
|
||||
|
||||
account1 = MailAccountFactory()
|
||||
rule1 = MailRuleFactory(account=account1)
|
||||
|
||||
response = self.client.patch(
|
||||
f"{self.ENDPOINT}{rule1.pk}/",
|
||||
response = mail_api_client.patch(
|
||||
f"{MAIL_RULES_ENDPOINT}{rule1.pk}/",
|
||||
data={
|
||||
"name": "Updated Name 1",
|
||||
"action": MailRule.MailAction.DELETE,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
|
||||
returned_rule1 = MailRule.objects.get(pk=rule1.pk)
|
||||
self.assertEqual(returned_rule1.name, "Updated Name 1")
|
||||
self.assertEqual(returned_rule1.action, MailRule.MailAction.DELETE)
|
||||
assert returned_rule1.name == "Updated Name 1"
|
||||
assert returned_rule1.action == MailRule.MailAction.DELETE
|
||||
|
||||
def test_create_mail_rule_scopes_accounts(self) -> None:
|
||||
other_user = User.objects.create_user(username="mail-owner")
|
||||
def test_create_mail_rule_scopes_accounts(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
django_user_model: type[User],
|
||||
) -> None:
|
||||
other_user = django_user_model.objects.create_user(username="mail-owner")
|
||||
foreign_account = MailAccountFactory(name="ForeignEmail", owner=other_user)
|
||||
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
response = mail_api_client.post(
|
||||
MAIL_RULES_ENDPOINT,
|
||||
data={
|
||||
"name": "Rule1",
|
||||
"account": foreign_account.pk,
|
||||
@@ -512,8 +520,8 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
"attachment_type": MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
|
||||
},
|
||||
)
|
||||
missing_response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
missing_response = mail_api_client.post(
|
||||
MAIL_RULES_ENDPOINT,
|
||||
data={
|
||||
"name": "Rule1",
|
||||
"account": foreign_account.pk + 1000,
|
||||
@@ -528,21 +536,24 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertEqual(missing_response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertEqual(response.data["account"][0].code, "does_not_exist")
|
||||
self.assertEqual(missing_response.data["account"][0].code, "does_not_exist")
|
||||
self.assertEqual(MailRule.objects.count(), 0)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert missing_response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert response.data["account"][0].code == "does_not_exist"
|
||||
assert missing_response.data["account"][0].code == "does_not_exist"
|
||||
assert MailRule.objects.count() == 0
|
||||
|
||||
def test_create_mail_rule_allowed_for_granted_account_change_permission(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
mail_api_user: User,
|
||||
django_user_model: type[User],
|
||||
) -> None:
|
||||
other_user = User.objects.create_user(username="mail-owner")
|
||||
other_user = django_user_model.objects.create_user(username="mail-owner")
|
||||
foreign_account = MailAccountFactory(name="ForeignEmail", owner=other_user)
|
||||
assign_perm("change_mailaccount", self.user, foreign_account)
|
||||
assign_perm("change_mailaccount", mail_api_user, foreign_account)
|
||||
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
response = mail_api_client.post(
|
||||
MAIL_RULES_ENDPOINT,
|
||||
data={
|
||||
"name": "Rule1",
|
||||
"account": foreign_account.pk,
|
||||
@@ -557,25 +568,34 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
||||
self.assertEqual(MailRule.objects.get().account, foreign_account)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
assert MailRule.objects.get().account == foreign_account
|
||||
|
||||
def test_update_mail_rule_forbidden_for_unpermitted_account(self) -> None:
|
||||
def test_update_mail_rule_forbidden_for_unpermitted_account(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
django_user_model: type[User],
|
||||
) -> None:
|
||||
own_account = MailAccountFactory()
|
||||
other_user = User.objects.create_user(username="mail-owner")
|
||||
other_user = django_user_model.objects.create_user(username="mail-owner")
|
||||
foreign_account = MailAccountFactory(owner=other_user)
|
||||
rule1 = MailRuleFactory(account=own_account)
|
||||
|
||||
response = self.client.patch(
|
||||
f"{self.ENDPOINT}{rule1.pk}/",
|
||||
response = mail_api_client.patch(
|
||||
f"{MAIL_RULES_ENDPOINT}{rule1.pk}/",
|
||||
data={"account": foreign_account.pk},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
rule1.refresh_from_db()
|
||||
self.assertEqual(rule1.account, own_account)
|
||||
assert rule1.account == own_account
|
||||
|
||||
def test_get_mail_rules_owner_aware(self) -> None:
|
||||
def test_get_mail_rules_owner_aware(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
mail_api_user: User,
|
||||
django_user_model: type[User],
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Configured rules with different users
|
||||
@@ -584,24 +604,26 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- Only unowned, owned by user or granted mail rules are provided
|
||||
"""
|
||||
|
||||
user2 = User.objects.create_user(username="temp_admin2")
|
||||
user2 = django_user_model.objects.create_user(username="temp_admin2")
|
||||
account1 = MailAccountFactory()
|
||||
rule1 = MailRuleFactory(account=account1, order=0)
|
||||
rule2 = MailRuleFactory(account=account1, order=1, owner=self.user)
|
||||
rule2 = MailRuleFactory(account=account1, order=1, owner=mail_api_user)
|
||||
MailRuleFactory(account=account1, order=2, owner=user2)
|
||||
rule4 = MailRuleFactory(account=account1, order=3, owner=user2)
|
||||
assign_perm("view_mailrule", self.user, rule4)
|
||||
assign_perm("view_mailrule", mail_api_user, rule4)
|
||||
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
response = mail_api_client.get(MAIL_RULES_ENDPOINT)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["count"], 3)
|
||||
self.assertEqual(response.data["results"][0]["name"], rule1.name)
|
||||
self.assertEqual(response.data["results"][1]["name"], rule2.name)
|
||||
self.assertEqual(response.data["results"][2]["name"], rule4.name)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["count"] == 3
|
||||
assert response.data["results"][0]["name"] == rule1.name
|
||||
assert response.data["results"][1]["name"] == rule2.name
|
||||
assert response.data["results"][2]["name"] == rule4.name
|
||||
|
||||
def test_mailrule_maxage_validation(self) -> None:
|
||||
def test_mailrule_maxage_validation(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- An existing mail account
|
||||
@@ -629,24 +651,24 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
"attachment_type": MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
|
||||
}
|
||||
|
||||
response = self.client.post(self.ENDPOINT, data=rule_data, format="json")
|
||||
response = mail_api_client.post(
|
||||
MAIL_RULES_ENDPOINT,
|
||||
data=rule_data,
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertIn("maximum_age", response.data)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert "maximum_age" in response.data
|
||||
|
||||
|
||||
class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/processed_mail/"
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
|
||||
self.user = User.objects.create_user(username="temp_admin")
|
||||
self.user.user_permissions.add(*Permission.objects.all())
|
||||
self.user.save()
|
||||
self.client.force_authenticate(user=self.user)
|
||||
|
||||
def test_get_processed_mails_owner_aware(self) -> None:
|
||||
@pytest.mark.django_db
|
||||
class TestAPIProcessedMails:
|
||||
def test_get_processed_mails_owner_aware(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
mail_api_user: User,
|
||||
django_user_model: type[User],
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Configured processed mails with different users
|
||||
@@ -655,27 +677,31 @@ class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- Only unowned, owned by user or granted processed mails are provided
|
||||
"""
|
||||
user2 = User.objects.create_user(username="temp_admin2")
|
||||
user2 = django_user_model.objects.create_user(username="temp_admin2")
|
||||
rule = MailRuleFactory()
|
||||
pm1 = ProcessedMailFactory(rule=rule)
|
||||
pm2 = ProcessedMailFactory(
|
||||
rule=rule,
|
||||
status="FAILED",
|
||||
error="err",
|
||||
owner=self.user,
|
||||
owner=mail_api_user,
|
||||
)
|
||||
ProcessedMailFactory(rule=rule, owner=user2)
|
||||
pm4 = ProcessedMailFactory(rule=rule, owner=user2)
|
||||
assign_perm("view_processedmail", self.user, pm4)
|
||||
assign_perm("view_processedmail", mail_api_user, pm4)
|
||||
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
response = mail_api_client.get(PROCESSED_MAIL_ENDPOINT)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["count"], 3)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["count"] == 3
|
||||
returned_ids = {r["id"] for r in response.data["results"]}
|
||||
self.assertSetEqual(returned_ids, {pm1.id, pm2.id, pm4.id})
|
||||
assert returned_ids == {pm1.id, pm2.id, pm4.id}
|
||||
|
||||
def test_get_processed_mails_filter_by_rule(self) -> None:
|
||||
def test_get_processed_mails_filter_by_rule(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
mail_api_user: User,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Processed mails belonging to two different rules
|
||||
@@ -687,17 +713,22 @@ class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
|
||||
account = MailAccountFactory()
|
||||
rule1 = MailRuleFactory(account=account)
|
||||
rule2 = MailRuleFactory(account=account)
|
||||
pm1 = ProcessedMailFactory(rule=rule1, owner=self.user)
|
||||
pm1 = ProcessedMailFactory(rule=rule1, owner=mail_api_user)
|
||||
pm2 = ProcessedMailFactory(rule=rule1, status="FAILED", error="e")
|
||||
ProcessedMailFactory(rule=rule2)
|
||||
|
||||
response = self.client.get(f"{self.ENDPOINT}?rule={rule1.pk}")
|
||||
response = mail_api_client.get(f"{PROCESSED_MAIL_ENDPOINT}?rule={rule1.pk}")
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
returned_ids = {r["id"] for r in response.data["results"]}
|
||||
self.assertSetEqual(returned_ids, {pm1.id, pm2.id})
|
||||
assert returned_ids == {pm1.id, pm2.id}
|
||||
|
||||
def test_bulk_delete_processed_mails(self) -> None:
|
||||
def test_bulk_delete_processed_mails(
|
||||
self,
|
||||
mail_api_client: APIClient,
|
||||
mail_api_user: User,
|
||||
django_user_model: type[User],
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Processed mails belonging to two different rules and different users
|
||||
@@ -706,7 +737,7 @@ class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
|
||||
THEN:
|
||||
- Only the specified processed mails are deleted, respecting ownership and permissions
|
||||
"""
|
||||
user2 = User.objects.create_user(username="temp_admin2")
|
||||
user2 = django_user_model.objects.create_user(username="temp_admin2")
|
||||
rule = MailRuleFactory()
|
||||
# unowned, owned by self, and one with explicit object perm
|
||||
pm_unowned = ProcessedMailFactory(rule=rule)
|
||||
@@ -714,46 +745,47 @@ class TestAPIProcessedMails(DirectoriesMixin, APITestCase):
|
||||
rule=rule,
|
||||
status="FAILED",
|
||||
error="e",
|
||||
owner=self.user,
|
||||
owner=mail_api_user,
|
||||
)
|
||||
pm_granted = ProcessedMailFactory(rule=rule, owner=user2)
|
||||
assign_perm("delete_processedmail", self.user, pm_granted)
|
||||
assign_perm("delete_processedmail", mail_api_user, pm_granted)
|
||||
pm_forbidden = ProcessedMailFactory(rule=rule, owner=user2)
|
||||
|
||||
# Success for allowed items
|
||||
response = self.client.post(
|
||||
f"{self.ENDPOINT}bulk_delete/",
|
||||
response = mail_api_client.post(
|
||||
PROCESSED_MAIL_BULK_DELETE_ENDPOINT,
|
||||
data={
|
||||
"mail_ids": [pm_unowned.id, pm_owned.id, pm_granted.id],
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["result"], "OK")
|
||||
self.assertSetEqual(
|
||||
set(response.data["deleted_mail_ids"]),
|
||||
{pm_unowned.id, pm_owned.id, pm_granted.id},
|
||||
)
|
||||
self.assertFalse(ProcessedMail.objects.filter(id=pm_unowned.id).exists())
|
||||
self.assertFalse(ProcessedMail.objects.filter(id=pm_owned.id).exists())
|
||||
self.assertFalse(ProcessedMail.objects.filter(id=pm_granted.id).exists())
|
||||
self.assertTrue(ProcessedMail.objects.filter(id=pm_forbidden.id).exists())
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["result"] == "OK"
|
||||
assert set(response.data["deleted_mail_ids"]) == {
|
||||
pm_unowned.id,
|
||||
pm_owned.id,
|
||||
pm_granted.id,
|
||||
}
|
||||
assert not ProcessedMail.objects.filter(id=pm_unowned.id).exists()
|
||||
assert not ProcessedMail.objects.filter(id=pm_owned.id).exists()
|
||||
assert not ProcessedMail.objects.filter(id=pm_granted.id).exists()
|
||||
assert ProcessedMail.objects.filter(id=pm_forbidden.id).exists()
|
||||
|
||||
# 403 and not deleted
|
||||
response = self.client.post(
|
||||
f"{self.ENDPOINT}bulk_delete/",
|
||||
response = mail_api_client.post(
|
||||
PROCESSED_MAIL_BULK_DELETE_ENDPOINT,
|
||||
data={
|
||||
"mail_ids": [pm_forbidden.id],
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
self.assertTrue(ProcessedMail.objects.filter(id=pm_forbidden.id).exists())
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert ProcessedMail.objects.filter(id=pm_forbidden.id).exists()
|
||||
|
||||
# missing mail_ids
|
||||
response = self.client.post(
|
||||
f"{self.ENDPOINT}bulk_delete/",
|
||||
response = mail_api_client.post(
|
||||
PROCESSED_MAIL_BULK_DELETE_ENDPOINT,
|
||||
data={"mail_ids": "not-a-list"},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
from datetime import timedelta
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
import pytest_mock
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import TestCase
|
||||
from django.test import override_settings
|
||||
from django.test import Client
|
||||
from django.utils import timezone
|
||||
from httpx_oauth.oauth2 import GetAccessTokenError
|
||||
from httpx_oauth.oauth2 import RefreshTokenError
|
||||
from pytest_django.fixtures import SettingsWrapper
|
||||
from rest_framework import status
|
||||
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
@@ -16,340 +17,374 @@ from paperless_mail.oauth import PaperlessMailOAuth2Manager
|
||||
from paperless_mail.tests.factories import MailAccountFactory
|
||||
|
||||
|
||||
@override_settings(
|
||||
OAUTH_CALLBACK_BASE_URL="http://localhost:8000",
|
||||
GMAIL_OAUTH_CLIENT_ID="test_gmail_client_id",
|
||||
GMAIL_OAUTH_CLIENT_SECRET="test_gmail_client_secret",
|
||||
OUTLOOK_OAUTH_CLIENT_ID="test_outlook_client_id",
|
||||
OUTLOOK_OAUTH_CLIENT_SECRET="test_outlook_client_secret",
|
||||
)
|
||||
class TestMailOAuth(
|
||||
TestCase,
|
||||
):
|
||||
def setUp(self) -> None:
|
||||
self.user = User.objects.create_user("testuser")
|
||||
self.user.user_permissions.add(
|
||||
*Permission.objects.filter(
|
||||
codename__in=[
|
||||
"add_mailaccount",
|
||||
],
|
||||
@pytest.fixture()
|
||||
def oauth_manager() -> PaperlessMailOAuth2Manager:
|
||||
return PaperlessMailOAuth2Manager()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def oauth_session(client: Client) -> Client:
|
||||
"""Seed the test client session with a known oauth_state."""
|
||||
session = client.session
|
||||
session.update({"oauth_state": "test_state"})
|
||||
session.save()
|
||||
return client
|
||||
|
||||
|
||||
class TestOAuthUrlGeneration:
|
||||
"""OAuth callback / redirect URL construction by PaperlessMailOAuth2Manager."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("overrides", "expected"),
|
||||
[
|
||||
pytest.param(
|
||||
{"OAUTH_CALLBACK_BASE_URL": "http://paperless.example.com"},
|
||||
"http://paperless.example.com/api/oauth/callback/",
|
||||
id="callback-base-url-set",
|
||||
),
|
||||
)
|
||||
self.user.save()
|
||||
self.client.force_login(self.user)
|
||||
self.mail_account_handler = MailAccountHandler()
|
||||
super().setUp()
|
||||
|
||||
def test_generate_paths(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for OAuth callback and base URLs
|
||||
WHEN:
|
||||
- get_oauth_callback_url and get_oauth_redirect_url are called
|
||||
THEN:
|
||||
- Correct URLs are generated
|
||||
"""
|
||||
# Callback URL
|
||||
oauth_manager = PaperlessMailOAuth2Manager()
|
||||
with override_settings(OAUTH_CALLBACK_BASE_URL="http://paperless.example.com"):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_callback_url,
|
||||
pytest.param(
|
||||
{
|
||||
"OAUTH_CALLBACK_BASE_URL": None,
|
||||
"PAPERLESS_URL": "http://paperless.example.com",
|
||||
},
|
||||
"http://paperless.example.com/api/oauth/callback/",
|
||||
)
|
||||
with override_settings(
|
||||
OAUTH_CALLBACK_BASE_URL=None,
|
||||
PAPERLESS_URL="http://paperless.example.com",
|
||||
):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_callback_url,
|
||||
"http://paperless.example.com/api/oauth/callback/",
|
||||
)
|
||||
with override_settings(
|
||||
OAUTH_CALLBACK_BASE_URL=None,
|
||||
PAPERLESS_URL="http://paperless.example.com",
|
||||
BASE_URL="/paperless/",
|
||||
):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_callback_url,
|
||||
id="falls-back-to-paperless-url",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"OAUTH_CALLBACK_BASE_URL": None,
|
||||
"PAPERLESS_URL": "http://paperless.example.com",
|
||||
"BASE_URL": "/paperless/",
|
||||
},
|
||||
"http://paperless.example.com/paperless/api/oauth/callback/",
|
||||
)
|
||||
|
||||
# Redirect URL
|
||||
with override_settings(DEBUG=True):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_redirect_url,
|
||||
"http://localhost:4200/mail",
|
||||
)
|
||||
with override_settings(DEBUG=False):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_redirect_url,
|
||||
"/mail",
|
||||
)
|
||||
|
||||
@mock.patch(
|
||||
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_gmail_access_token",
|
||||
id="respects-base-url-prefix",
|
||||
),
|
||||
],
|
||||
)
|
||||
@mock.patch(
|
||||
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_outlook_access_token",
|
||||
)
|
||||
def test_oauth_callback_view_success(
|
||||
def test_oauth_callback_url(
|
||||
self,
|
||||
mock_get_outlook_access_token,
|
||||
mock_get_gmail_access_token,
|
||||
settings: SettingsWrapper,
|
||||
oauth_manager: PaperlessMailOAuth2Manager,
|
||||
overrides: dict,
|
||||
expected: str,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
- Various combinations of OAUTH_CALLBACK_BASE_URL, PAPERLESS_URL, and BASE_URL
|
||||
WHEN:
|
||||
- OAuth callback is called with a code and scope
|
||||
- OAuth callback is called with a code and no scope
|
||||
- oauth_callback_url is read from the manager
|
||||
THEN:
|
||||
- Gmail mail account is created
|
||||
- Outlook mail account is created
|
||||
- The expected fully-qualified callback URL is produced
|
||||
"""
|
||||
for key, value in overrides.items():
|
||||
setattr(settings, key, value)
|
||||
assert oauth_manager.oauth_callback_url == expected
|
||||
|
||||
mock_get_gmail_access_token.return_value = {
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh_token",
|
||||
"expires_in": 3600,
|
||||
}
|
||||
mock_get_outlook_access_token.return_value = {
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh_token",
|
||||
"expires_in": 3600,
|
||||
}
|
||||
|
||||
session = self.client.session
|
||||
session.update(
|
||||
{
|
||||
"oauth_state": "test_state",
|
||||
},
|
||||
)
|
||||
session.save()
|
||||
|
||||
# Test Google OAuth callback
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
|
||||
self.assertIn("oauth_success=1", response.url)
|
||||
mock_get_gmail_access_token.assert_called_once()
|
||||
self.assertTrue(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
|
||||
# Test Outlook OAuth callback
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&state=test_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
|
||||
self.assertIn("oauth_success=1", response.url)
|
||||
self.assertTrue(
|
||||
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
|
||||
)
|
||||
|
||||
@mock.patch("httpx_oauth.oauth2.BaseOAuth2.get_access_token")
|
||||
def test_oauth_callback_view_fails(self, mock_get_access_token) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
WHEN:
|
||||
- OAuth callback is called and get access token returns an error
|
||||
THEN:
|
||||
- No mail account is created
|
||||
- Error is logged
|
||||
"""
|
||||
mock_get_access_token.side_effect = GetAccessTokenError("test_error")
|
||||
|
||||
session = self.client.session
|
||||
session.update(
|
||||
{
|
||||
"oauth_state": "test_state",
|
||||
},
|
||||
)
|
||||
session.save()
|
||||
|
||||
with self.assertLogs("paperless_mail", level="ERROR") as cm:
|
||||
# Test Google OAuth callback
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
|
||||
self.assertIn("oauth_success=0", response.url)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
|
||||
# Test Outlook OAuth callback
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&state=test_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
|
||||
self.assertIn("oauth_success=0", response.url)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(
|
||||
imap_server="outlook.office365.com",
|
||||
).exists(),
|
||||
)
|
||||
|
||||
self.assertIn(
|
||||
"Error getting access token from OAuth provider",
|
||||
cm.output[0],
|
||||
)
|
||||
|
||||
def test_oauth_callback_view_insufficient_permissions(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
- User without add_mailaccount permission
|
||||
WHEN:
|
||||
- OAuth callback is called
|
||||
THEN:
|
||||
- 400 bad request returned, no mail accounts are created
|
||||
"""
|
||||
self.user.user_permissions.remove(
|
||||
*Permission.objects.filter(
|
||||
codename__in=[
|
||||
"add_mailaccount",
|
||||
],
|
||||
@pytest.mark.parametrize(
|
||||
("debug", "expected"),
|
||||
[
|
||||
pytest.param(
|
||||
True,
|
||||
"http://localhost:4200/mail",
|
||||
id="debug-redirects-to-ng-dev",
|
||||
),
|
||||
)
|
||||
self.user.save()
|
||||
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
|
||||
)
|
||||
|
||||
def test_oauth_callback_view_no_code(self) -> None:
|
||||
pytest.param(False, "/mail", id="prod-redirects-to-relative-path"),
|
||||
],
|
||||
)
|
||||
def test_oauth_redirect_url(
|
||||
self,
|
||||
settings: SettingsWrapper,
|
||||
oauth_manager: PaperlessMailOAuth2Manager,
|
||||
debug: bool, # noqa: FBT001
|
||||
expected: str,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
- DEBUG is toggled on or off
|
||||
WHEN:
|
||||
- OAuth callback is called without a code
|
||||
- oauth_redirect_url is read from the manager
|
||||
THEN:
|
||||
- 400 bad request returned, no mail accounts are created
|
||||
- In DEBUG mode the Angular dev server URL is returned, otherwise a relative path
|
||||
"""
|
||||
settings.DEBUG = debug
|
||||
assert oauth_manager.oauth_redirect_url == expected
|
||||
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
|
||||
)
|
||||
|
||||
def test_oauth_callback_view_invalid_state(self) -> None:
|
||||
@pytest.mark.django_db
|
||||
class TestOAuthCallbackView:
|
||||
"""End-to-end behavior of the /api/oauth/callback/ endpoint."""
|
||||
|
||||
def test_no_code(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
- OAuth client IDs and secrets configured
|
||||
WHEN:
|
||||
- OAuth callback is called with an invalid state
|
||||
- The OAuth callback is called without a code parameter
|
||||
THEN:
|
||||
- 400 bad request returned, no mail accounts are created
|
||||
- 400 Bad Request is returned and no mail account is created
|
||||
"""
|
||||
response = client.get("/api/oauth/callback/")
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert not MailAccount.objects.filter(imap_server="imap.gmail.com").exists()
|
||||
assert not MailAccount.objects.filter(
|
||||
imap_server="outlook.office365.com",
|
||||
).exists()
|
||||
|
||||
response = self.client.get(
|
||||
def test_invalid_state(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- OAuth client IDs and secrets configured
|
||||
WHEN:
|
||||
- The OAuth callback is called with a state that does not match the session
|
||||
THEN:
|
||||
- 400 Bad Request is returned and no mail account is created
|
||||
"""
|
||||
response = client.get(
|
||||
"/api/oauth/callback/?code=test_code&state=invalid_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
|
||||
)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert not MailAccount.objects.filter(imap_server="imap.gmail.com").exists()
|
||||
assert not MailAccount.objects.filter(
|
||||
imap_server="outlook.office365.com",
|
||||
).exists()
|
||||
|
||||
@mock.patch("paperless_mail.mail.get_mailbox")
|
||||
@mock.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
|
||||
)
|
||||
def test_refresh_token_on_handle_mail_account(
|
||||
def test_insufficient_permissions(
|
||||
self,
|
||||
mock_refresh_token,
|
||||
mock_get_mailbox,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mail account with refresh token and expiration
|
||||
- OAuth client IDs and secrets configured
|
||||
- User without add_mailaccount permission
|
||||
WHEN:
|
||||
- handle_mail_account is called
|
||||
- The OAuth callback is called
|
||||
THEN:
|
||||
- Refresh token is called
|
||||
- 400 Bad Request is returned and no mail account is created
|
||||
"""
|
||||
|
||||
mock_mailbox = mock.MagicMock()
|
||||
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
|
||||
|
||||
mail_account = MailAccountFactory(
|
||||
name="Test Gmail Mail Account",
|
||||
username="test_username",
|
||||
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
|
||||
is_token=True,
|
||||
refresh_token="test_refresh_token",
|
||||
expiration=timezone.now() - timedelta(days=1),
|
||||
mail_user.user_permissions.remove(
|
||||
*Permission.objects.filter(codename__in=["add_mailaccount"]),
|
||||
)
|
||||
mail_user.save()
|
||||
|
||||
mock_refresh_token.return_value = {
|
||||
response = client.get(
|
||||
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
|
||||
)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert not MailAccount.objects.filter(imap_server="imap.gmail.com").exists()
|
||||
assert not MailAccount.objects.filter(
|
||||
imap_server="outlook.office365.com",
|
||||
).exists()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("provider", "callback_query", "expected_imap"),
|
||||
[
|
||||
pytest.param(
|
||||
"gmail",
|
||||
"code=test_code&scope=https://mail.google.com/&state=test_state",
|
||||
"imap.gmail.com",
|
||||
id="gmail",
|
||||
),
|
||||
pytest.param(
|
||||
"outlook",
|
||||
"code=test_code&state=test_state",
|
||||
"outlook.office365.com",
|
||||
id="outlook",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_success(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
oauth_session: Client,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
provider: str,
|
||||
callback_query: str,
|
||||
expected_imap: str,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- OAuth client IDs and secrets configured for Gmail and Outlook
|
||||
- A valid oauth_state seeded in the session
|
||||
WHEN:
|
||||
- The OAuth callback is called with a code and provider-specific scope
|
||||
THEN:
|
||||
- A redirect with oauth_success=1 is returned
|
||||
- The provider's access-token method is invoked
|
||||
- A mail account for the matching provider is created
|
||||
"""
|
||||
token_payload = {
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh_token",
|
||||
"expires_in": 3600,
|
||||
}
|
||||
target = (
|
||||
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_gmail_access_token"
|
||||
if provider == "gmail"
|
||||
else "paperless_mail.oauth.PaperlessMailOAuth2Manager.get_outlook_access_token"
|
||||
)
|
||||
mocked = mocker.patch(target, return_value=token_payload)
|
||||
|
||||
self.mail_account_handler.handle_mail_account(mail_account)
|
||||
mock_refresh_token.assert_called_once()
|
||||
mock_refresh_token.reset_mock()
|
||||
response = client.get(f"/api/oauth/callback/?{callback_query}")
|
||||
|
||||
mock_refresh_token.return_value = {
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh",
|
||||
"expires_in": 3600,
|
||||
}
|
||||
outlook_mail_account = MailAccountFactory(
|
||||
name="Test Outlook Mail Account",
|
||||
assert response.status_code == status.HTTP_302_FOUND
|
||||
assert "oauth_success=1" in response.url
|
||||
mocked.assert_called_once()
|
||||
assert MailAccount.objects.filter(imap_server=expected_imap).exists()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("callback_query", "imap_server"),
|
||||
[
|
||||
pytest.param(
|
||||
"code=test_code&scope=https://mail.google.com/&state=test_state",
|
||||
"imap.gmail.com",
|
||||
id="gmail",
|
||||
),
|
||||
pytest.param(
|
||||
"code=test_code&state=test_state",
|
||||
"outlook.office365.com",
|
||||
id="outlook",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_provider_error(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
oauth_session: Client,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
callback_query: str,
|
||||
imap_server: str,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- OAuth client IDs and secrets configured
|
||||
- The provider's access-token endpoint raises GetAccessTokenError
|
||||
WHEN:
|
||||
- The OAuth callback is called with a code (Gmail or Outlook)
|
||||
THEN:
|
||||
- A redirect with oauth_success=0 is returned
|
||||
- No mail account is created
|
||||
- The failure is logged at ERROR level
|
||||
"""
|
||||
mocker.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.get_access_token",
|
||||
side_effect=GetAccessTokenError("test_error"),
|
||||
)
|
||||
|
||||
with caplog.at_level("ERROR", logger="paperless_mail"):
|
||||
response = client.get(f"/api/oauth/callback/?{callback_query}")
|
||||
|
||||
assert response.status_code == status.HTTP_302_FOUND
|
||||
assert "oauth_success=0" in response.url
|
||||
assert not MailAccount.objects.filter(imap_server=imap_server).exists()
|
||||
assert any(
|
||||
"Error getting access token from OAuth provider" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestRefreshTokenOnHandleMailAccount:
|
||||
"""OAuth refresh-token flow exercised through MailAccountHandler.handle_mail_account."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("account_type", "name"),
|
||||
[
|
||||
pytest.param(
|
||||
MailAccount.MailAccountType.GMAIL_OAUTH,
|
||||
"Test Gmail",
|
||||
id="gmail",
|
||||
),
|
||||
pytest.param(
|
||||
MailAccount.MailAccountType.OUTLOOK_OAUTH,
|
||||
"Test Outlook",
|
||||
id="outlook",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_refresh_token_called(
|
||||
self,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
mail_account_handler: MailAccountHandler,
|
||||
account_type: MailAccount.MailAccountType,
|
||||
name: str,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- An OAuth-backed mail account with a refresh token and an expired access token
|
||||
WHEN:
|
||||
- handle_mail_account is called
|
||||
THEN:
|
||||
- The OAuth refresh_token endpoint is invoked exactly once
|
||||
"""
|
||||
mock_mailbox = mocker.MagicMock()
|
||||
mocker.patch(
|
||||
"paperless_mail.mail.get_mailbox",
|
||||
).return_value.__enter__.return_value = mock_mailbox
|
||||
mock_refresh = mocker.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
|
||||
return_value={
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh_token",
|
||||
"expires_in": 3600,
|
||||
},
|
||||
)
|
||||
|
||||
account = MailAccountFactory(
|
||||
name=name,
|
||||
username="test_username",
|
||||
account_type=MailAccount.MailAccountType.OUTLOOK_OAUTH,
|
||||
account_type=account_type,
|
||||
is_token=True,
|
||||
refresh_token="test_refresh_token",
|
||||
expiration=timezone.now() - timedelta(days=1),
|
||||
)
|
||||
|
||||
self.mail_account_handler.handle_mail_account(outlook_mail_account)
|
||||
mock_refresh_token.assert_called_once()
|
||||
mail_account_handler.handle_mail_account(account)
|
||||
mock_refresh.assert_called_once()
|
||||
|
||||
@mock.patch("paperless_mail.mail.get_mailbox")
|
||||
@mock.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
|
||||
)
|
||||
def test_refresh_token_on_handle_mail_account_fails(
|
||||
def test_refresh_token_failure(
|
||||
self,
|
||||
mock_refresh_token,
|
||||
mock_get_mailbox,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
mail_account_handler: MailAccountHandler,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mail account with refresh token and expiration
|
||||
- An OAuth-backed mail account with a refresh token and an expired access token
|
||||
- The OAuth refresh_token endpoint raises RefreshTokenError
|
||||
WHEN:
|
||||
- handle_mail_account is called
|
||||
- Refresh token is called but fails
|
||||
THEN:
|
||||
- Error is logged
|
||||
- 0 processed mails is returned
|
||||
- The failure is logged at ERROR level with the account context
|
||||
"""
|
||||
mock_mailbox = mocker.MagicMock()
|
||||
mocker.patch(
|
||||
"paperless_mail.mail.get_mailbox",
|
||||
).return_value.__enter__.return_value = mock_mailbox
|
||||
mock_refresh = mocker.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
|
||||
side_effect=RefreshTokenError("test_error"),
|
||||
)
|
||||
|
||||
mock_mailbox = mock.MagicMock()
|
||||
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
|
||||
|
||||
mail_account = MailAccountFactory(
|
||||
account = MailAccountFactory(
|
||||
name="Test Gmail Mail Account",
|
||||
username="test_username",
|
||||
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
|
||||
@@ -358,16 +393,13 @@ class TestMailOAuth(
|
||||
expiration=timezone.now() - timedelta(days=1),
|
||||
)
|
||||
|
||||
mock_refresh_token.side_effect = RefreshTokenError("test_error")
|
||||
with caplog.at_level("ERROR", logger="paperless_mail"):
|
||||
result = mail_account_handler.handle_mail_account(account)
|
||||
|
||||
with self.assertLogs("paperless_mail", level="ERROR") as cm:
|
||||
# returns 0 processed mails
|
||||
self.assertEqual(
|
||||
self.mail_account_handler.handle_mail_account(mail_account),
|
||||
0,
|
||||
)
|
||||
mock_refresh_token.assert_called_once()
|
||||
self.assertIn(
|
||||
f"Failed to refresh oauth token for account {mail_account}: test_error",
|
||||
cm.output[0],
|
||||
)
|
||||
assert result == 0
|
||||
mock_refresh.assert_called_once()
|
||||
assert any(
|
||||
f"Failed to refresh oauth token for account {account}: test_error"
|
||||
in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
|
||||
@@ -1,28 +1,31 @@
|
||||
import email
|
||||
import email.contentmanager
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from collections.abc import Generator
|
||||
from email.message import Message
|
||||
from email.mime.application import MIMEApplication
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from unittest import mock
|
||||
from pathlib import Path
|
||||
|
||||
import gnupg
|
||||
from django.test import override_settings
|
||||
import pytest
|
||||
from imap_tools import MailMessage
|
||||
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
from paperless_mail.models import MailRule
|
||||
from paperless_mail.preprocessor import MailMessageDecryptor
|
||||
from paperless_mail.tests.factories import MailAccountFactory
|
||||
from paperless_mail.tests.test_mail import TestMail
|
||||
from paperless_mail.tests.test_mail import _AttachmentDef
|
||||
|
||||
|
||||
class MessageEncryptor:
|
||||
def __init__(self) -> None:
|
||||
self.gpg_home = tempfile.mkdtemp()
|
||||
"""
|
||||
Test helper: generates a throwaway GPG keypair in a tempdir and exposes
|
||||
`encrypt(MailMessage) -> MailMessage`.
|
||||
"""
|
||||
|
||||
def __init__(self, gpg_home: Path) -> None:
|
||||
self.gpg_home = str(gpg_home)
|
||||
self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
|
||||
self._testUser = "testuser@example.com"
|
||||
# Generate a new key
|
||||
@@ -36,9 +39,9 @@ class MessageEncryptor:
|
||||
)
|
||||
self.gpg.gen_key(input_data)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
def kill_agent(self) -> None:
|
||||
"""
|
||||
Kill the gpg-agent process and clean up the temporary GPG home directory.
|
||||
Kill the gpg-agent so pytest can remove the GPG home.
|
||||
|
||||
This uses gpgconf to properly terminate the agent, which is the officially
|
||||
recommended cleanup method from the GnuPG project. python-gnupg does not
|
||||
@@ -57,9 +60,6 @@ class MessageEncryptor:
|
||||
# gpgconf not found or hung - agent will timeout eventually
|
||||
pass
|
||||
|
||||
# Clean up the temporary directory
|
||||
shutil.rmtree(self.gpg_home, ignore_errors=True)
|
||||
|
||||
@staticmethod
|
||||
def get_email_body_without_headers(email_message: Message) -> bytes:
|
||||
"""
|
||||
@@ -74,7 +74,7 @@ class MessageEncryptor:
|
||||
]
|
||||
return message_copy.as_bytes()
|
||||
|
||||
def encrypt(self, message):
|
||||
def encrypt(self, message) -> MailMessage:
|
||||
original_email: email.message.Message = message.obj
|
||||
encrypted_data = self.gpg.encrypt(
|
||||
self.get_email_body_without_headers(original_email),
|
||||
@@ -104,152 +104,181 @@ class MessageEncryptor:
|
||||
)
|
||||
new_email.attach(encrypted_part)
|
||||
|
||||
encrypted_message: MailMessage = MailMessage(
|
||||
return MailMessage(
|
||||
[(f"UID {message.uid}".encode(), new_email.as_bytes())],
|
||||
)
|
||||
return encrypted_message
|
||||
|
||||
|
||||
class TestMailMessageGpgDecryptor(TestMail):
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
"""Create GPG encryptor once for all tests in this class."""
|
||||
super().setUpClass()
|
||||
cls.messageEncryptor = MessageEncryptor()
|
||||
@pytest.fixture(scope="session")
|
||||
def message_encryptor(
|
||||
tmp_path_factory: pytest.TempPathFactory,
|
||||
) -> Generator[MessageEncryptor, None, None]:
|
||||
"""
|
||||
Session-scoped: GPG keypair generation is slow (~1s+), and nothing in
|
||||
these tests mutates the keyring after creation. The GPG home directory
|
||||
comes from `tmp_path_factory` so pytest cleans it up at session end;
|
||||
we still kill the gpg-agent ourselves so the dir is removable.
|
||||
"""
|
||||
gpg_home = tmp_path_factory.mktemp("gpg-home")
|
||||
encryptor = MessageEncryptor(gpg_home)
|
||||
yield encryptor
|
||||
encryptor.kill_agent()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
"""Clean up GPG resources after all tests complete."""
|
||||
if hasattr(cls, "messageEncryptor"):
|
||||
cls.messageEncryptor.cleanup()
|
||||
super().tearDownClass()
|
||||
|
||||
def setUp(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
):
|
||||
super().setUp()
|
||||
@pytest.fixture()
|
||||
def gpg_settings(settings, message_encryptor: MessageEncryptor):
|
||||
settings.EMAIL_GNUPG_HOME = message_encryptor.gpg_home
|
||||
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
|
||||
return settings
|
||||
|
||||
def test_preprocessor_is_able_to_run(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
):
|
||||
self.assertTrue(MailMessageDecryptor.able_to_run())
|
||||
|
||||
def test_preprocessor_is_able_to_run2(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_GNUPG_HOME=None,
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
):
|
||||
self.assertTrue(MailMessageDecryptor.able_to_run())
|
||||
@pytest.fixture()
|
||||
def encrypted_pair(mail_mocker, message_encryptor: MessageEncryptor):
|
||||
"""
|
||||
Build a (encrypted, plaintext) MailMessage pair sharing the same UID and
|
||||
headers, with two PDF attachments on the plaintext side.
|
||||
"""
|
||||
plaintext = mail_mocker.messageBuilder.create_message(
|
||||
body="Test message with 2 attachments",
|
||||
attachments=[
|
||||
_AttachmentDef(filename="f1.pdf", disposition="inline"),
|
||||
_AttachmentDef(filename="f2.pdf"),
|
||||
],
|
||||
)
|
||||
encrypted = message_encryptor.encrypt(plaintext)
|
||||
return encrypted, plaintext
|
||||
|
||||
def test_is_not_able_to_run_disabled(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=False,
|
||||
):
|
||||
self.assertFalse(MailMessageDecryptor.able_to_run())
|
||||
|
||||
def test_is_not_able_to_run_bogus_path(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
EMAIL_GNUPG_HOME="_)@# notapath &%#$",
|
||||
):
|
||||
self.assertFalse(MailMessageDecryptor.able_to_run())
|
||||
class TestMailMessageDecryptorAbleToRun:
|
||||
"""`MailMessageDecryptor.able_to_run()` configuration matrix — no DB needed."""
|
||||
|
||||
def test_fails_at_initialization(self) -> None:
|
||||
with (
|
||||
mock.patch("gnupg.GPG.__init__") as mock_run,
|
||||
override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
@pytest.mark.parametrize(
|
||||
("settings_overrides", "expected"),
|
||||
[
|
||||
pytest.param(
|
||||
{
|
||||
"EMAIL_GNUPG_HOME": "_gpg_home_marker",
|
||||
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
|
||||
},
|
||||
True,
|
||||
id="enabled-with-valid-home",
|
||||
),
|
||||
):
|
||||
pytest.param(
|
||||
{"EMAIL_GNUPG_HOME": None, "EMAIL_ENABLE_GPG_DECRYPTOR": True},
|
||||
True,
|
||||
id="enabled-with-default-home",
|
||||
),
|
||||
pytest.param(
|
||||
{"EMAIL_ENABLE_GPG_DECRYPTOR": False},
|
||||
False,
|
||||
id="disabled",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
|
||||
"EMAIL_GNUPG_HOME": "_)@# notapath &%#$",
|
||||
},
|
||||
False,
|
||||
id="enabled-with-bogus-path",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_able_to_run(
|
||||
self,
|
||||
settings,
|
||||
message_encryptor: MessageEncryptor,
|
||||
settings_overrides: dict,
|
||||
*,
|
||||
expected: bool,
|
||||
) -> None:
|
||||
for key, value in settings_overrides.items():
|
||||
if value == "_gpg_home_marker":
|
||||
value = message_encryptor.gpg_home
|
||||
setattr(settings, key, value)
|
||||
assert MailMessageDecryptor.able_to_run() is expected
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
raise OSError("Cannot find 'gpg' binary")
|
||||
|
||||
mock_run.side_effect = side_effect
|
||||
@pytest.mark.django_db
|
||||
class TestMailMessageDecryptor:
|
||||
"""End-to-end decrypt and consumption flow with a real GPG keyring."""
|
||||
|
||||
handler = MailAccountHandler()
|
||||
self.assertEqual(len(handler._message_preprocessors), 0)
|
||||
def test_fails_at_initialization(self, settings, mocker) -> None:
|
||||
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
|
||||
mocker.patch(
|
||||
"gnupg.GPG.__init__",
|
||||
side_effect=OSError("Cannot find 'gpg' binary"),
|
||||
)
|
||||
|
||||
def test_decrypt_fails(self) -> None:
|
||||
encrypted_message, _ = self.create_encrypted_unencrypted_message_pair()
|
||||
# This test creates its own empty GPG home to test decryption failure
|
||||
empty_gpg_home = tempfile.mkdtemp()
|
||||
handler = MailAccountHandler()
|
||||
|
||||
assert len(handler._message_preprocessors) == 0
|
||||
|
||||
def test_decrypt_fails(self, settings, encrypted_pair, tmp_path: Path) -> None:
|
||||
"""
|
||||
A decryptor pointed at a fresh empty GPG home cannot decrypt the
|
||||
message — ensure it surfaces an exception rather than silently passing
|
||||
bytes through.
|
||||
"""
|
||||
encrypted_message, _ = encrypted_pair
|
||||
empty_gpg_home = tmp_path / "empty-gpg-home"
|
||||
empty_gpg_home.mkdir()
|
||||
|
||||
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
|
||||
settings.EMAIL_GNUPG_HOME = str(empty_gpg_home)
|
||||
|
||||
decryptor = MailMessageDecryptor()
|
||||
try:
|
||||
with override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
EMAIL_GNUPG_HOME=empty_gpg_home,
|
||||
):
|
||||
message_decryptor = MailMessageDecryptor()
|
||||
self.assertRaises(Exception, message_decryptor.run, encrypted_message)
|
||||
with pytest.raises(Exception):
|
||||
decryptor.run(encrypted_message)
|
||||
finally:
|
||||
# Clean up the temporary GPG home used only by this test
|
||||
try:
|
||||
subprocess.run(
|
||||
["gpgconf", "--kill", "gpg-agent"],
|
||||
env={"GNUPGHOME": empty_gpg_home},
|
||||
env={"GNUPGHOME": str(empty_gpg_home)},
|
||||
check=False,
|
||||
capture_output=True,
|
||||
timeout=5,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
pass
|
||||
shutil.rmtree(empty_gpg_home, ignore_errors=True)
|
||||
|
||||
def test_decrypt_encrypted_mail(self) -> None:
|
||||
def test_decrypt_encrypted_mail(self, gpg_settings, encrypted_pair) -> None:
|
||||
"""
|
||||
Creates a mail with attachments. Then encrypts it with a new key.
|
||||
Verifies that this encrypted message can be decrypted with attachments intact.
|
||||
"""
|
||||
encrypted_message, message = self.create_encrypted_unencrypted_message_pair()
|
||||
headers = message.headers
|
||||
text = message.text
|
||||
encrypted_message, plaintext = encrypted_pair
|
||||
headers = plaintext.headers
|
||||
text = plaintext.text
|
||||
|
||||
self.assertEqual(len(encrypted_message.attachments), 1)
|
||||
self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc")
|
||||
self.assertEqual(encrypted_message.text, "")
|
||||
assert len(encrypted_message.attachments) == 1
|
||||
assert encrypted_message.attachments[0].filename == "encrypted.asc"
|
||||
assert encrypted_message.text == ""
|
||||
|
||||
with override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
|
||||
):
|
||||
message_decryptor = MailMessageDecryptor()
|
||||
self.assertTrue(message_decryptor.able_to_run())
|
||||
decrypted_message = message_decryptor.run(encrypted_message)
|
||||
decryptor = MailMessageDecryptor()
|
||||
assert decryptor.able_to_run()
|
||||
decrypted = decryptor.run(encrypted_message)
|
||||
|
||||
self.assertEqual(len(decrypted_message.attachments), 2)
|
||||
self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf")
|
||||
self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf")
|
||||
self.assertEqual(decrypted_message.headers, headers)
|
||||
self.assertEqual(decrypted_message.text, text)
|
||||
self.assertEqual(decrypted_message.uid, message.uid)
|
||||
assert len(decrypted.attachments) == 2
|
||||
assert decrypted.attachments[0].filename == "f1.pdf"
|
||||
assert decrypted.attachments[1].filename == "f2.pdf"
|
||||
assert decrypted.headers == headers
|
||||
assert decrypted.text == text
|
||||
assert decrypted.uid == plaintext.uid
|
||||
|
||||
def create_encrypted_unencrypted_message_pair(self):
|
||||
message = self.mailMocker.messageBuilder.create_message(
|
||||
body="Test message with 2 attachments",
|
||||
attachments=[
|
||||
_AttachmentDef(
|
||||
filename="f1.pdf",
|
||||
disposition="inline",
|
||||
),
|
||||
_AttachmentDef(filename="f2.pdf"),
|
||||
],
|
||||
)
|
||||
encrypted_message = self.messageEncryptor.encrypt(message)
|
||||
return encrypted_message, message
|
||||
|
||||
def test_handle_encrypted_message(self) -> None:
|
||||
message = self.mailMocker.messageBuilder.create_message(
|
||||
def test_handle_encrypted_message(
|
||||
self,
|
||||
gpg_settings,
|
||||
mail_mocker,
|
||||
message_encryptor: MessageEncryptor,
|
||||
) -> None:
|
||||
plaintext = mail_mocker.messageBuilder.create_message(
|
||||
subject="the message title",
|
||||
from_="Myself",
|
||||
attachments=2,
|
||||
body="Test mail",
|
||||
)
|
||||
|
||||
encrypted_message = self.messageEncryptor.encrypt(message)
|
||||
encrypted = message_encryptor.encrypt(plaintext)
|
||||
|
||||
account = MailAccountFactory()
|
||||
rule = MailRule(
|
||||
@@ -259,18 +288,17 @@ class TestMailMessageGpgDecryptor(TestMail):
|
||||
)
|
||||
rule.save()
|
||||
|
||||
result = self.mail_account_handler._handle_message(encrypted_message, rule)
|
||||
handler = MailAccountHandler()
|
||||
result = handler._handle_message(encrypted, rule)
|
||||
|
||||
self.assertEqual(result, 3)
|
||||
|
||||
self.mailMocker._queue_consumption_tasks_mock.assert_called()
|
||||
|
||||
self.mailMocker.assert_queue_consumption_tasks_call_args(
|
||||
assert result == 3
|
||||
mail_mocker._queue_consumption_tasks_mock.assert_called()
|
||||
mail_mocker.assert_queue_consumption_tasks_call_args(
|
||||
[
|
||||
[
|
||||
{
|
||||
"override_title": message.subject,
|
||||
"override_filename": f"{message.subject}.eml",
|
||||
"override_title": plaintext.subject,
|
||||
"override_filename": f"{plaintext.subject}.eml",
|
||||
},
|
||||
],
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user