mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-05-04 13:45:25 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1307b66993 |
@@ -11,7 +11,7 @@ concurrency:
|
||||
group: backend-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
env:
|
||||
DEFAULT_UV_VERSION: "0.10.x"
|
||||
DEFAULT_UV_VERSION: "0.11.x"
|
||||
NLTK_DATA: "/usr/share/nltk_data"
|
||||
permissions: {}
|
||||
jobs:
|
||||
|
||||
@@ -11,7 +11,7 @@ concurrency:
|
||||
permissions:
|
||||
contents: read
|
||||
env:
|
||||
DEFAULT_UV_VERSION: "0.10.x"
|
||||
DEFAULT_UV_VERSION: "0.11.x"
|
||||
DEFAULT_PYTHON_VERSION: "3.12"
|
||||
jobs:
|
||||
changes:
|
||||
|
||||
@@ -8,7 +8,7 @@ concurrency:
|
||||
group: release-${{ github.ref }}
|
||||
cancel-in-progress: false
|
||||
env:
|
||||
DEFAULT_UV_VERSION: "0.10.x"
|
||||
DEFAULT_UV_VERSION: "0.11.x"
|
||||
DEFAULT_PYTHON_VERSION: "3.12"
|
||||
permissions: {}
|
||||
jobs:
|
||||
|
||||
@@ -3,6 +3,8 @@ on:
|
||||
push:
|
||||
branches:
|
||||
- dev
|
||||
env:
|
||||
DEFAULT_UV_VERSION: "0.11.x"
|
||||
jobs:
|
||||
generate-translate-strings:
|
||||
name: Generate Translation Strings
|
||||
@@ -29,6 +31,7 @@ jobs:
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
|
||||
with:
|
||||
version: ${{ env.DEFAULT_UV_VERSION }}
|
||||
enable-cache: true
|
||||
- name: Install backend python dependencies
|
||||
run: |
|
||||
|
||||
+1
-1
@@ -30,7 +30,7 @@ RUN set -eux \
|
||||
# Purpose: Installs s6-overlay and rootfs
|
||||
# Comments:
|
||||
# - Don't leave anything extra in here either
|
||||
FROM ghcr.io/astral-sh/uv:0.10.9-python3.12-trixie-slim AS s6-overlay-base
|
||||
FROM ghcr.io/astral-sh/uv:0.11.6-python3.12-trixie-slim AS s6-overlay-base
|
||||
|
||||
WORKDIR /usr/src/s6
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ from pytest_django.fixtures import SettingsWrapper
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from documents.tests.factories import DocumentFactory
|
||||
from documents.tests.factories import UserFactory
|
||||
|
||||
UserModelT = get_user_model()
|
||||
|
||||
@@ -130,9 +129,9 @@ def rest_api_client():
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def regular_user(db) -> UserModelT:
|
||||
def regular_user(django_user_model: type[UserModelT]) -> UserModelT:
|
||||
"""Unprivileged authenticated user for permission boundary tests."""
|
||||
return UserFactory.create()
|
||||
return django_user_model.objects.create_user(username="regular", password="regular")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
|
||||
@@ -35,7 +35,7 @@ def user_admin() -> PaperlessUserAdmin:
|
||||
|
||||
@pytest.fixture
|
||||
def staff_user(db) -> User:
|
||||
return UserFactory.create(staff=True)
|
||||
return UserFactory.create(username="staff", staff=True)
|
||||
|
||||
|
||||
class TestDocumentAdmin(DirectoriesMixin, TestCase):
|
||||
|
||||
@@ -649,7 +649,7 @@ class TestDocumentConsumptionFinishedSignal:
|
||||
assert doc_with_keyword.correspondent == correspondent
|
||||
|
||||
def test_correspondent_not_applied(self, doc_with_keyword: Document) -> None:
|
||||
CorrespondentFactory.create(
|
||||
TagFactory.create(
|
||||
match="no-match",
|
||||
matching_algorithm=MatchingModel.MATCH_ANY,
|
||||
)
|
||||
|
||||
@@ -1,18 +1,13 @@
|
||||
from collections.abc import Generator
|
||||
|
||||
import pytest
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import Client
|
||||
from pytest_django.fixtures import SettingsWrapper
|
||||
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
from paperless_mail.models import MailAccount
|
||||
from paperless_mail.tests.factories import MailAccountFactory
|
||||
from paperless_mail.tests.test_mail import MailMocker
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@pytest.fixture()
|
||||
def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
|
||||
"""
|
||||
Create a mail account configured for local Greenmail server.
|
||||
@@ -29,47 +24,6 @@ def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
|
||||
account.delete()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@pytest.fixture()
|
||||
def mail_account_handler() -> MailAccountHandler:
|
||||
return MailAccountHandler()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mail_user(db: None, django_user_model, client: Client) -> User:
|
||||
"""
|
||||
Create a user with the `add_mailaccount` permission and log them in via
|
||||
the test client. Returned so tests can mutate permissions if needed.
|
||||
"""
|
||||
user = django_user_model.objects.create_user("testuser")
|
||||
user.user_permissions.add(*Permission.objects.filter(codename="add_mailaccount"))
|
||||
client.force_login(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oauth_settings(settings: SettingsWrapper) -> SettingsWrapper:
|
||||
"""
|
||||
Apply the OAuth callback / client-id settings the OAuth flow needs. Uses
|
||||
pytest-django's `settings` fixture so values are reverted automatically.
|
||||
"""
|
||||
settings.OAUTH_CALLBACK_BASE_URL = "http://localhost:8000"
|
||||
settings.GMAIL_OAUTH_CLIENT_ID = "test_gmail_client_id"
|
||||
settings.GMAIL_OAUTH_CLIENT_SECRET = "test_gmail_client_secret"
|
||||
settings.OUTLOOK_OAUTH_CLIENT_ID = "test_outlook_client_id"
|
||||
settings.OUTLOOK_OAUTH_CLIENT_SECRET = "test_outlook_client_secret"
|
||||
return settings
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mail_mocker(db: None) -> Generator[MailMocker, None, None]:
|
||||
"""
|
||||
Provides a MailMocker instance with its `MailBox` and
|
||||
`queue_consumption_tasks` patches active. Cleanups registered via
|
||||
TestCase.addCleanup are run on teardown by calling doCleanups().
|
||||
"""
|
||||
mocker = MailMocker()
|
||||
mocker.setUp()
|
||||
try:
|
||||
yield mocker
|
||||
finally:
|
||||
mocker.doCleanups()
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
from datetime import timedelta
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
import pytest_mock
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import Client
|
||||
from django.test import TestCase
|
||||
from django.test import override_settings
|
||||
from django.utils import timezone
|
||||
from httpx_oauth.oauth2 import GetAccessTokenError
|
||||
from httpx_oauth.oauth2 import RefreshTokenError
|
||||
from pytest_django.fixtures import SettingsWrapper
|
||||
from rest_framework import status
|
||||
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
@@ -17,371 +16,358 @@ from paperless_mail.oauth import PaperlessMailOAuth2Manager
|
||||
from paperless_mail.tests.factories import MailAccountFactory
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oauth_manager() -> PaperlessMailOAuth2Manager:
|
||||
return PaperlessMailOAuth2Manager()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oauth_session(client: Client) -> Client:
|
||||
"""Seed the test client session with a known oauth_state."""
|
||||
session = client.session
|
||||
session.update({"oauth_state": "test_state"})
|
||||
session.save()
|
||||
return client
|
||||
|
||||
|
||||
class TestOAuthUrlGeneration:
|
||||
"""OAuth callback / redirect URL construction by PaperlessMailOAuth2Manager."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("overrides", "expected"),
|
||||
[
|
||||
pytest.param(
|
||||
{"OAUTH_CALLBACK_BASE_URL": "http://paperless.example.com"},
|
||||
"http://paperless.example.com/api/oauth/callback/",
|
||||
id="callback-base-url-set",
|
||||
@override_settings(
|
||||
OAUTH_CALLBACK_BASE_URL="http://localhost:8000",
|
||||
GMAIL_OAUTH_CLIENT_ID="test_gmail_client_id",
|
||||
GMAIL_OAUTH_CLIENT_SECRET="test_gmail_client_secret",
|
||||
OUTLOOK_OAUTH_CLIENT_ID="test_outlook_client_id",
|
||||
OUTLOOK_OAUTH_CLIENT_SECRET="test_outlook_client_secret",
|
||||
)
|
||||
class TestMailOAuth(
|
||||
TestCase,
|
||||
):
|
||||
def setUp(self) -> None:
|
||||
self.user = User.objects.create_user("testuser")
|
||||
self.user.user_permissions.add(
|
||||
*Permission.objects.filter(
|
||||
codename__in=[
|
||||
"add_mailaccount",
|
||||
],
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"OAUTH_CALLBACK_BASE_URL": None,
|
||||
"PAPERLESS_URL": "http://paperless.example.com",
|
||||
},
|
||||
)
|
||||
self.user.save()
|
||||
self.client.force_login(self.user)
|
||||
self.mail_account_handler = MailAccountHandler()
|
||||
super().setUp()
|
||||
|
||||
def test_generate_paths(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for OAuth callback and base URLs
|
||||
WHEN:
|
||||
- get_oauth_callback_url and get_oauth_redirect_url are called
|
||||
THEN:
|
||||
- Correct URLs are generated
|
||||
"""
|
||||
# Callback URL
|
||||
oauth_manager = PaperlessMailOAuth2Manager()
|
||||
with override_settings(OAUTH_CALLBACK_BASE_URL="http://paperless.example.com"):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_callback_url,
|
||||
"http://paperless.example.com/api/oauth/callback/",
|
||||
id="falls-back-to-paperless-url",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"OAUTH_CALLBACK_BASE_URL": None,
|
||||
"PAPERLESS_URL": "http://paperless.example.com",
|
||||
"BASE_URL": "/paperless/",
|
||||
},
|
||||
)
|
||||
with override_settings(
|
||||
OAUTH_CALLBACK_BASE_URL=None,
|
||||
PAPERLESS_URL="http://paperless.example.com",
|
||||
):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_callback_url,
|
||||
"http://paperless.example.com/api/oauth/callback/",
|
||||
)
|
||||
with override_settings(
|
||||
OAUTH_CALLBACK_BASE_URL=None,
|
||||
PAPERLESS_URL="http://paperless.example.com",
|
||||
BASE_URL="/paperless/",
|
||||
):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_callback_url,
|
||||
"http://paperless.example.com/paperless/api/oauth/callback/",
|
||||
id="respects-base-url-prefix",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_oauth_callback_url(
|
||||
self,
|
||||
settings: SettingsWrapper,
|
||||
oauth_manager: PaperlessMailOAuth2Manager,
|
||||
overrides: dict,
|
||||
expected: str,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Various combinations of OAUTH_CALLBACK_BASE_URL, PAPERLESS_URL, and BASE_URL
|
||||
WHEN:
|
||||
- oauth_callback_url is read from the manager
|
||||
THEN:
|
||||
- The expected fully-qualified callback URL is produced
|
||||
"""
|
||||
for key, value in overrides.items():
|
||||
setattr(settings, key, value)
|
||||
assert oauth_manager.oauth_callback_url == expected
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("debug", "expected"),
|
||||
[
|
||||
pytest.param(
|
||||
True,
|
||||
# Redirect URL
|
||||
with override_settings(DEBUG=True):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_redirect_url,
|
||||
"http://localhost:4200/mail",
|
||||
id="debug-redirects-to-ng-dev",
|
||||
),
|
||||
pytest.param(False, "/mail", id="prod-redirects-to-relative-path"),
|
||||
],
|
||||
)
|
||||
with override_settings(DEBUG=False):
|
||||
self.assertEqual(
|
||||
oauth_manager.oauth_redirect_url,
|
||||
"/mail",
|
||||
)
|
||||
|
||||
@mock.patch(
|
||||
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_gmail_access_token",
|
||||
)
|
||||
def test_oauth_redirect_url(
|
||||
self,
|
||||
settings: SettingsWrapper,
|
||||
oauth_manager: PaperlessMailOAuth2Manager,
|
||||
debug: bool, # noqa: FBT001
|
||||
expected: str,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- DEBUG is toggled on or off
|
||||
WHEN:
|
||||
- oauth_redirect_url is read from the manager
|
||||
THEN:
|
||||
- In DEBUG mode the Angular dev server URL is returned, otherwise a relative path
|
||||
"""
|
||||
settings.DEBUG = debug
|
||||
assert oauth_manager.oauth_redirect_url == expected
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestOAuthCallbackView:
|
||||
"""End-to-end behavior of the /api/oauth/callback/ endpoint."""
|
||||
|
||||
def test_no_code(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- OAuth client IDs and secrets configured
|
||||
WHEN:
|
||||
- The OAuth callback is called without a code parameter
|
||||
THEN:
|
||||
- 400 Bad Request is returned and no mail account is created
|
||||
"""
|
||||
response = client.get("/api/oauth/callback/")
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert not MailAccount.objects.exists()
|
||||
|
||||
def test_invalid_state(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- OAuth client IDs and secrets configured
|
||||
WHEN:
|
||||
- The OAuth callback is called with a state that does not match the session
|
||||
THEN:
|
||||
- 400 Bad Request is returned and no mail account is created
|
||||
"""
|
||||
response = client.get(
|
||||
"/api/oauth/callback/?code=test_code&state=invalid_state",
|
||||
)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert not MailAccount.objects.exists()
|
||||
|
||||
def test_insufficient_permissions(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- OAuth client IDs and secrets configured
|
||||
- User without add_mailaccount permission
|
||||
WHEN:
|
||||
- The OAuth callback is called
|
||||
THEN:
|
||||
- 400 Bad Request is returned and no mail account is created
|
||||
"""
|
||||
mail_user.user_permissions.remove(
|
||||
*Permission.objects.filter(codename="add_mailaccount"),
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
|
||||
)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert not MailAccount.objects.exists()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("provider", "callback_query", "expected_imap"),
|
||||
[
|
||||
pytest.param(
|
||||
"gmail",
|
||||
"code=test_code&scope=https://mail.google.com/&state=test_state",
|
||||
"imap.gmail.com",
|
||||
id="gmail",
|
||||
),
|
||||
pytest.param(
|
||||
"outlook",
|
||||
"code=test_code&state=test_state",
|
||||
"outlook.office365.com",
|
||||
id="outlook",
|
||||
),
|
||||
],
|
||||
@mock.patch(
|
||||
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_outlook_access_token",
|
||||
)
|
||||
def test_success(
|
||||
def test_oauth_callback_view_success(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
oauth_session: Client,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
provider: str,
|
||||
callback_query: str,
|
||||
expected_imap: str,
|
||||
mock_get_outlook_access_token,
|
||||
mock_get_gmail_access_token,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- OAuth client IDs and secrets configured for Gmail and Outlook
|
||||
- A valid oauth_state seeded in the session
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
WHEN:
|
||||
- The OAuth callback is called with a code and provider-specific scope
|
||||
- OAuth callback is called with a code and scope
|
||||
- OAuth callback is called with a code and no scope
|
||||
THEN:
|
||||
- A redirect with oauth_success=1 is returned
|
||||
- The provider's access-token method is invoked
|
||||
- A mail account for the matching provider is created
|
||||
- Gmail mail account is created
|
||||
- Outlook mail account is created
|
||||
"""
|
||||
token_payload = {
|
||||
|
||||
mock_get_gmail_access_token.return_value = {
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh_token",
|
||||
"expires_in": 3600,
|
||||
}
|
||||
mocked = mocker.patch(
|
||||
f"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_{provider}_access_token",
|
||||
return_value=token_payload,
|
||||
mock_get_outlook_access_token.return_value = {
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh_token",
|
||||
"expires_in": 3600,
|
||||
}
|
||||
|
||||
session = self.client.session
|
||||
session.update(
|
||||
{
|
||||
"oauth_state": "test_state",
|
||||
},
|
||||
)
|
||||
session.save()
|
||||
|
||||
# Test Google OAuth callback
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
|
||||
self.assertIn("oauth_success=1", response.url)
|
||||
mock_get_gmail_access_token.assert_called_once()
|
||||
self.assertTrue(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
|
||||
response = client.get(f"/api/oauth/callback/?{callback_query}")
|
||||
# Test Outlook OAuth callback
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&state=test_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
|
||||
self.assertIn("oauth_success=1", response.url)
|
||||
self.assertTrue(
|
||||
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_302_FOUND
|
||||
assert "oauth_success=1" in response.url
|
||||
mocked.assert_called_once()
|
||||
assert MailAccount.objects.filter(imap_server=expected_imap).exists()
|
||||
@mock.patch("httpx_oauth.oauth2.BaseOAuth2.get_access_token")
|
||||
def test_oauth_callback_view_fails(self, mock_get_access_token) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
WHEN:
|
||||
- OAuth callback is called and get access token returns an error
|
||||
THEN:
|
||||
- No mail account is created
|
||||
- Error is logged
|
||||
"""
|
||||
mock_get_access_token.side_effect = GetAccessTokenError("test_error")
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("callback_query", "imap_server"),
|
||||
[
|
||||
pytest.param(
|
||||
"code=test_code&scope=https://mail.google.com/&state=test_state",
|
||||
"imap.gmail.com",
|
||||
id="gmail",
|
||||
session = self.client.session
|
||||
session.update(
|
||||
{
|
||||
"oauth_state": "test_state",
|
||||
},
|
||||
)
|
||||
session.save()
|
||||
|
||||
with self.assertLogs("paperless_mail", level="ERROR") as cm:
|
||||
# Test Google OAuth callback
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
|
||||
self.assertIn("oauth_success=0", response.url)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
|
||||
# Test Outlook OAuth callback
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&state=test_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
|
||||
self.assertIn("oauth_success=0", response.url)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(
|
||||
imap_server="outlook.office365.com",
|
||||
).exists(),
|
||||
)
|
||||
|
||||
self.assertIn(
|
||||
"Error getting access token from OAuth provider",
|
||||
cm.output[0],
|
||||
)
|
||||
|
||||
def test_oauth_callback_view_insufficient_permissions(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
- User without add_mailaccount permission
|
||||
WHEN:
|
||||
- OAuth callback is called
|
||||
THEN:
|
||||
- 400 bad request returned, no mail accounts are created
|
||||
"""
|
||||
self.user.user_permissions.remove(
|
||||
*Permission.objects.filter(
|
||||
codename__in=[
|
||||
"add_mailaccount",
|
||||
],
|
||||
),
|
||||
pytest.param(
|
||||
"code=test_code&state=test_state",
|
||||
"outlook.office365.com",
|
||||
id="outlook",
|
||||
),
|
||||
],
|
||||
)
|
||||
self.user.save()
|
||||
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
|
||||
)
|
||||
|
||||
def test_oauth_callback_view_no_code(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
WHEN:
|
||||
- OAuth callback is called without a code
|
||||
THEN:
|
||||
- 400 bad request returned, no mail accounts are created
|
||||
"""
|
||||
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
|
||||
)
|
||||
|
||||
def test_oauth_callback_view_invalid_state(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
|
||||
WHEN:
|
||||
- OAuth callback is called with an invalid state
|
||||
THEN:
|
||||
- 400 bad request returned, no mail accounts are created
|
||||
"""
|
||||
|
||||
response = self.client.get(
|
||||
"/api/oauth/callback/?code=test_code&state=invalid_state",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
|
||||
)
|
||||
self.assertFalse(
|
||||
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
|
||||
)
|
||||
|
||||
@mock.patch("paperless_mail.mail.get_mailbox")
|
||||
@mock.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
|
||||
)
|
||||
def test_provider_error(
|
||||
def test_refresh_token_on_handle_mail_account(
|
||||
self,
|
||||
client: Client,
|
||||
mail_user: User,
|
||||
oauth_settings: SettingsWrapper,
|
||||
oauth_session: Client,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
callback_query: str,
|
||||
imap_server: str,
|
||||
mock_refresh_token,
|
||||
mock_get_mailbox,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- OAuth client IDs and secrets configured
|
||||
- The provider's access-token endpoint raises GetAccessTokenError
|
||||
- Mail account with refresh token and expiration
|
||||
WHEN:
|
||||
- The OAuth callback is called with a code (Gmail or Outlook)
|
||||
- handle_mail_account is called
|
||||
THEN:
|
||||
- A redirect with oauth_success=0 is returned
|
||||
- No mail account is created
|
||||
- The failure is logged at ERROR level
|
||||
- Refresh token is called
|
||||
"""
|
||||
mocker.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.get_access_token",
|
||||
side_effect=GetAccessTokenError("test_error"),
|
||||
)
|
||||
|
||||
with caplog.at_level("ERROR", logger="paperless_mail"):
|
||||
response = client.get(f"/api/oauth/callback/?{callback_query}")
|
||||
mock_mailbox = mock.MagicMock()
|
||||
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
|
||||
|
||||
assert response.status_code == status.HTTP_302_FOUND
|
||||
assert "oauth_success=0" in response.url
|
||||
assert not MailAccount.objects.filter(imap_server=imap_server).exists()
|
||||
assert any(
|
||||
"Error getting access token from OAuth provider" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def expired_oauth_account_factory(db: None, mocker: pytest_mock.MockerFixture):
|
||||
"""
|
||||
Build an OAuth-backed MailAccount whose access token has already expired,
|
||||
while patching `get_mailbox` so no real IMAP connection is attempted.
|
||||
"""
|
||||
mocker.patch(
|
||||
"paperless_mail.mail.get_mailbox",
|
||||
).return_value.__enter__.return_value = mocker.MagicMock()
|
||||
|
||||
def _make(account_type: MailAccount.MailAccountType) -> MailAccount:
|
||||
return MailAccountFactory(
|
||||
mail_account = MailAccountFactory(
|
||||
name="Test Gmail Mail Account",
|
||||
username="test_username",
|
||||
account_type=account_type,
|
||||
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
|
||||
is_token=True,
|
||||
refresh_token="test_refresh_token",
|
||||
expiration=timezone.now() - timedelta(days=1),
|
||||
)
|
||||
|
||||
return _make
|
||||
mock_refresh_token.return_value = {
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh_token",
|
||||
"expires_in": 3600,
|
||||
}
|
||||
|
||||
self.mail_account_handler.handle_mail_account(mail_account)
|
||||
mock_refresh_token.assert_called_once()
|
||||
mock_refresh_token.reset_mock()
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestRefreshTokenOnHandleMailAccount:
|
||||
"""OAuth refresh-token flow exercised through MailAccountHandler.handle_mail_account."""
|
||||
mock_refresh_token.return_value = {
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh",
|
||||
"expires_in": 3600,
|
||||
}
|
||||
outlook_mail_account = MailAccountFactory(
|
||||
name="Test Outlook Mail Account",
|
||||
username="test_username",
|
||||
account_type=MailAccount.MailAccountType.OUTLOOK_OAUTH,
|
||||
is_token=True,
|
||||
refresh_token="test_refresh_token",
|
||||
expiration=timezone.now() - timedelta(days=1),
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"account_type",
|
||||
[
|
||||
pytest.param(MailAccount.MailAccountType.GMAIL_OAUTH, id="gmail"),
|
||||
pytest.param(MailAccount.MailAccountType.OUTLOOK_OAUTH, id="outlook"),
|
||||
],
|
||||
self.mail_account_handler.handle_mail_account(outlook_mail_account)
|
||||
mock_refresh_token.assert_called_once()
|
||||
|
||||
@mock.patch("paperless_mail.mail.get_mailbox")
|
||||
@mock.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
|
||||
)
|
||||
def test_refresh_token_called(
|
||||
def test_refresh_token_on_handle_mail_account_fails(
|
||||
self,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
mail_account_handler: MailAccountHandler,
|
||||
expired_oauth_account_factory,
|
||||
account_type: MailAccount.MailAccountType,
|
||||
mock_refresh_token,
|
||||
mock_get_mailbox,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- An OAuth-backed mail account with a refresh token and an expired access token
|
||||
WHEN:
|
||||
- handle_mail_account is called
|
||||
THEN:
|
||||
- The OAuth refresh_token endpoint is invoked exactly once
|
||||
"""
|
||||
mock_refresh = mocker.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
|
||||
return_value={
|
||||
"access_token": "test_access_token",
|
||||
"refresh_token": "test_refresh_token",
|
||||
"expires_in": 3600,
|
||||
},
|
||||
)
|
||||
|
||||
mail_account_handler.handle_mail_account(
|
||||
expired_oauth_account_factory(account_type),
|
||||
)
|
||||
mock_refresh.assert_called_once()
|
||||
|
||||
def test_refresh_token_failure(
|
||||
self,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
mail_account_handler: MailAccountHandler,
|
||||
expired_oauth_account_factory,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- An OAuth-backed mail account with a refresh token and an expired access token
|
||||
- The OAuth refresh_token endpoint raises RefreshTokenError
|
||||
- Mail account with refresh token and expiration
|
||||
WHEN:
|
||||
- handle_mail_account is called
|
||||
- Refresh token is called but fails
|
||||
THEN:
|
||||
- Error is logged
|
||||
- 0 processed mails is returned
|
||||
- The failure is logged at ERROR level with the account context
|
||||
"""
|
||||
mock_refresh = mocker.patch(
|
||||
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
|
||||
side_effect=RefreshTokenError("test_error"),
|
||||
|
||||
mock_mailbox = mock.MagicMock()
|
||||
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
|
||||
|
||||
mail_account = MailAccountFactory(
|
||||
name="Test Gmail Mail Account",
|
||||
username="test_username",
|
||||
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
|
||||
is_token=True,
|
||||
refresh_token="test_refresh_token",
|
||||
expiration=timezone.now() - timedelta(days=1),
|
||||
)
|
||||
|
||||
account = expired_oauth_account_factory(
|
||||
MailAccount.MailAccountType.GMAIL_OAUTH,
|
||||
)
|
||||
mock_refresh_token.side_effect = RefreshTokenError("test_error")
|
||||
|
||||
with caplog.at_level("ERROR", logger="paperless_mail"):
|
||||
result = mail_account_handler.handle_mail_account(account)
|
||||
|
||||
assert result == 0
|
||||
mock_refresh.assert_called_once()
|
||||
assert any(
|
||||
f"Failed to refresh oauth token for account {account}: test_error"
|
||||
in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
with self.assertLogs("paperless_mail", level="ERROR") as cm:
|
||||
# returns 0 processed mails
|
||||
self.assertEqual(
|
||||
self.mail_account_handler.handle_mail_account(mail_account),
|
||||
0,
|
||||
)
|
||||
mock_refresh_token.assert_called_once()
|
||||
self.assertIn(
|
||||
f"Failed to refresh oauth token for account {mail_account}: test_error",
|
||||
cm.output[0],
|
||||
)
|
||||
|
||||
@@ -1,68 +1,64 @@
|
||||
import email
|
||||
import email.contentmanager
|
||||
import shutil
|
||||
import subprocess
|
||||
from collections.abc import Generator
|
||||
import tempfile
|
||||
from email.message import Message
|
||||
from email.mime.application import MIMEApplication
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
import gnupg
|
||||
import pytest
|
||||
from django.test import override_settings
|
||||
from imap_tools import MailMessage
|
||||
from pytest_django.fixtures import SettingsWrapper
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
from paperless_mail.models import MailRule
|
||||
from paperless_mail.preprocessor import MailMessageDecryptor
|
||||
from paperless_mail.tests.factories import MailAccountFactory
|
||||
from paperless_mail.tests.test_mail import MailMocker
|
||||
from paperless_mail.tests.test_mail import TestMail
|
||||
from paperless_mail.tests.test_mail import _AttachmentDef
|
||||
|
||||
|
||||
def _kill_gpg_agent(gpg_home: str) -> None:
|
||||
"""
|
||||
Terminate any gpg-agent attached to `gpg_home` so the directory is
|
||||
removable. Uses `gpgconf --kill`, the GnuPG project's recommended cleanup
|
||||
path; python-gnupg has no built-in cleanup since it only wraps the CLI.
|
||||
"""
|
||||
try:
|
||||
subprocess.run(
|
||||
["gpgconf", "--kill", "gpg-agent"],
|
||||
env={"GNUPGHOME": gpg_home},
|
||||
check=False,
|
||||
capture_output=True,
|
||||
timeout=5,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
pass
|
||||
|
||||
|
||||
class MessageEncryptor:
|
||||
"""
|
||||
Test helper: generates a throwaway GPG keypair in a tempdir and exposes
|
||||
`encrypt(MailMessage) -> MailMessage`.
|
||||
"""
|
||||
|
||||
TEST_USER = "testuser@example.com"
|
||||
|
||||
def __init__(self, gpg_home: Path) -> None:
|
||||
self.gpg_home = str(gpg_home)
|
||||
def __init__(self) -> None:
|
||||
self.gpg_home = tempfile.mkdtemp()
|
||||
self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
|
||||
self.gpg.gen_key(
|
||||
self.gpg.gen_key_input(
|
||||
name_email=self.TEST_USER,
|
||||
passphrase=None,
|
||||
key_type="RSA",
|
||||
key_length=2048,
|
||||
expire_date=0,
|
||||
no_protection=True,
|
||||
),
|
||||
self._testUser = "testuser@example.com"
|
||||
# Generate a new key
|
||||
input_data = self.gpg.gen_key_input(
|
||||
name_email=self._testUser,
|
||||
passphrase=None,
|
||||
key_type="RSA",
|
||||
key_length=2048,
|
||||
expire_date=0,
|
||||
no_protection=True,
|
||||
)
|
||||
self.gpg.gen_key(input_data)
|
||||
|
||||
def kill_agent(self) -> None:
|
||||
_kill_gpg_agent(self.gpg_home)
|
||||
def cleanup(self) -> None:
|
||||
"""
|
||||
Kill the gpg-agent process and clean up the temporary GPG home directory.
|
||||
|
||||
This uses gpgconf to properly terminate the agent, which is the officially
|
||||
recommended cleanup method from the GnuPG project. python-gnupg does not
|
||||
provide built-in cleanup methods as it's only a wrapper around the gpg CLI.
|
||||
"""
|
||||
# Kill the gpg-agent using the official GnuPG cleanup tool
|
||||
try:
|
||||
subprocess.run(
|
||||
["gpgconf", "--kill", "gpg-agent"],
|
||||
env={"GNUPGHOME": self.gpg_home},
|
||||
check=False,
|
||||
capture_output=True,
|
||||
timeout=5,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
# gpgconf not found or hung - agent will timeout eventually
|
||||
pass
|
||||
|
||||
# Clean up the temporary directory
|
||||
shutil.rmtree(self.gpg_home, ignore_errors=True)
|
||||
|
||||
@staticmethod
|
||||
def get_email_body_without_headers(email_message: Message) -> bytes:
|
||||
@@ -70,6 +66,7 @@ class MessageEncryptor:
|
||||
Filters some relevant headers from an EmailMessage and returns just the body.
|
||||
"""
|
||||
message_copy = email.message_from_bytes(email_message.as_bytes())
|
||||
|
||||
message_copy._headers = [
|
||||
header
|
||||
for header in message_copy._headers
|
||||
@@ -77,235 +74,203 @@ class MessageEncryptor:
|
||||
]
|
||||
return message_copy.as_bytes()
|
||||
|
||||
def encrypt(self, message: MailMessage) -> MailMessage:
|
||||
original_email: Message = message.obj
|
||||
def encrypt(self, message):
|
||||
original_email: email.message.Message = message.obj
|
||||
encrypted_data = self.gpg.encrypt(
|
||||
self.get_email_body_without_headers(original_email),
|
||||
self.TEST_USER,
|
||||
self._testUser,
|
||||
armor=True,
|
||||
)
|
||||
if not encrypted_data.ok:
|
||||
raise Exception(f"Encryption failed: {encrypted_data.stderr}")
|
||||
encrypted_email_content = encrypted_data.data
|
||||
|
||||
new_email = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")
|
||||
new_email["From"] = original_email["From"]
|
||||
new_email["To"] = original_email["To"]
|
||||
new_email["Subject"] = original_email["Subject"]
|
||||
|
||||
# Add the control part
|
||||
control_part = MIMEApplication(_data=b"", _subtype="pgp-encrypted")
|
||||
control_part.set_payload("Version: 1")
|
||||
new_email.attach(control_part)
|
||||
|
||||
# Add the encrypted data part
|
||||
encrypted_part = MIMEApplication(_data=b"", _subtype="octet-stream")
|
||||
encrypted_part.set_payload(encrypted_data.data.decode("ascii"))
|
||||
encrypted_part.set_payload(encrypted_email_content.decode("ascii"))
|
||||
encrypted_part.add_header(
|
||||
"Content-Disposition",
|
||||
'attachment; filename="encrypted.asc"',
|
||||
)
|
||||
new_email.attach(encrypted_part)
|
||||
|
||||
return MailMessage(
|
||||
encrypted_message: MailMessage = MailMessage(
|
||||
[(f"UID {message.uid}".encode(), new_email.as_bytes())],
|
||||
)
|
||||
return encrypted_message
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def message_encryptor(
|
||||
tmp_path_factory: pytest.TempPathFactory,
|
||||
) -> Generator[MessageEncryptor, None, None]:
|
||||
"""
|
||||
Session-scoped: GPG keypair generation is slow (~1s+), and nothing in
|
||||
these tests mutates the keyring after creation. The GPG home directory
|
||||
comes from `tmp_path_factory` so pytest cleans it up at session end;
|
||||
we still kill the gpg-agent ourselves so the dir is removable.
|
||||
"""
|
||||
encryptor = MessageEncryptor(tmp_path_factory.mktemp("gpg-home"))
|
||||
yield encryptor
|
||||
encryptor.kill_agent()
|
||||
class TestMailMessageGpgDecryptor(TestMail):
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
"""Create GPG encryptor once for all tests in this class."""
|
||||
super().setUpClass()
|
||||
cls.messageEncryptor = MessageEncryptor()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
"""Clean up GPG resources after all tests complete."""
|
||||
if hasattr(cls, "messageEncryptor"):
|
||||
cls.messageEncryptor.cleanup()
|
||||
super().tearDownClass()
|
||||
|
||||
@pytest.fixture
|
||||
def gpg_settings(
|
||||
settings: SettingsWrapper,
|
||||
message_encryptor: MessageEncryptor,
|
||||
) -> SettingsWrapper:
|
||||
settings.EMAIL_GNUPG_HOME = message_encryptor.gpg_home
|
||||
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
|
||||
return settings
|
||||
def setUp(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
):
|
||||
super().setUp()
|
||||
|
||||
def test_preprocessor_is_able_to_run(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
):
|
||||
self.assertTrue(MailMessageDecryptor.able_to_run())
|
||||
|
||||
@pytest.fixture
|
||||
def encrypted_pair(
|
||||
mail_mocker: MailMocker,
|
||||
message_encryptor: MessageEncryptor,
|
||||
) -> tuple[MailMessage, MailMessage]:
|
||||
"""
|
||||
Build a (encrypted, plaintext) MailMessage pair sharing the same UID and
|
||||
headers, with two PDF attachments on the plaintext side.
|
||||
"""
|
||||
plaintext = mail_mocker.messageBuilder.create_message(
|
||||
body="Test message with 2 attachments",
|
||||
attachments=[
|
||||
_AttachmentDef(filename="f1.pdf", disposition="inline"),
|
||||
_AttachmentDef(filename="f2.pdf"),
|
||||
],
|
||||
)
|
||||
return message_encryptor.encrypt(plaintext), plaintext
|
||||
def test_preprocessor_is_able_to_run2(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_GNUPG_HOME=None,
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
):
|
||||
self.assertTrue(MailMessageDecryptor.able_to_run())
|
||||
|
||||
def test_is_not_able_to_run_disabled(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=False,
|
||||
):
|
||||
self.assertFalse(MailMessageDecryptor.able_to_run())
|
||||
|
||||
# Sentinel used in `test_able_to_run` parametrization to request the real
|
||||
# GPG home from the session-scoped `message_encryptor` fixture at runtime.
|
||||
_VALID_GPG_HOME = object()
|
||||
def test_is_not_able_to_run_bogus_path(self) -> None:
|
||||
with override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
EMAIL_GNUPG_HOME="_)@# notapath &%#$",
|
||||
):
|
||||
self.assertFalse(MailMessageDecryptor.able_to_run())
|
||||
|
||||
|
||||
class TestMailMessageDecryptorAbleToRun:
|
||||
"""`MailMessageDecryptor.able_to_run()` configuration matrix."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("settings_overrides", "expected"),
|
||||
[
|
||||
pytest.param(
|
||||
{
|
||||
"EMAIL_GNUPG_HOME": _VALID_GPG_HOME,
|
||||
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
|
||||
},
|
||||
True,
|
||||
id="enabled-with-valid-home",
|
||||
def test_fails_at_initialization(self) -> None:
|
||||
with (
|
||||
mock.patch("gnupg.GPG.__init__") as mock_run,
|
||||
override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
),
|
||||
pytest.param(
|
||||
{"EMAIL_GNUPG_HOME": None, "EMAIL_ENABLE_GPG_DECRYPTOR": True},
|
||||
True,
|
||||
id="enabled-with-default-home",
|
||||
),
|
||||
pytest.param(
|
||||
{"EMAIL_ENABLE_GPG_DECRYPTOR": False},
|
||||
False,
|
||||
id="disabled",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
|
||||
"EMAIL_GNUPG_HOME": "_)@# notapath &%#$",
|
||||
},
|
||||
False,
|
||||
id="enabled-with-bogus-path",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_able_to_run(
|
||||
self,
|
||||
settings: SettingsWrapper,
|
||||
message_encryptor: MessageEncryptor,
|
||||
settings_overrides: dict,
|
||||
*,
|
||||
expected: bool,
|
||||
) -> None:
|
||||
for key, value in settings_overrides.items():
|
||||
if value is _VALID_GPG_HOME:
|
||||
value = message_encryptor.gpg_home
|
||||
setattr(settings, key, value)
|
||||
assert MailMessageDecryptor.able_to_run() is expected
|
||||
):
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
raise OSError("Cannot find 'gpg' binary")
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestMailMessageDecryptor:
|
||||
"""End-to-end decrypt and consumption flow with a real GPG keyring."""
|
||||
mock_run.side_effect = side_effect
|
||||
|
||||
def test_fails_at_initialization(
|
||||
self,
|
||||
settings: SettingsWrapper,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
|
||||
mocker.patch(
|
||||
"gnupg.GPG.__init__",
|
||||
side_effect=OSError("Cannot find 'gpg' binary"),
|
||||
)
|
||||
|
||||
handler = MailAccountHandler()
|
||||
|
||||
assert len(handler._message_preprocessors) == 0
|
||||
|
||||
def test_decrypt_fails(
|
||||
self,
|
||||
settings: SettingsWrapper,
|
||||
encrypted_pair: tuple[MailMessage, MailMessage],
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""
|
||||
A decryptor pointed at a fresh empty GPG home cannot decrypt the
|
||||
message — ensure it surfaces an exception rather than silently passing
|
||||
bytes through.
|
||||
"""
|
||||
encrypted_message, _ = encrypted_pair
|
||||
empty_gpg_home = tmp_path / "empty-gpg-home"
|
||||
empty_gpg_home.mkdir()
|
||||
|
||||
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
|
||||
settings.EMAIL_GNUPG_HOME = str(empty_gpg_home)
|
||||
handler = MailAccountHandler()
|
||||
self.assertEqual(len(handler._message_preprocessors), 0)
|
||||
|
||||
def test_decrypt_fails(self) -> None:
|
||||
encrypted_message, _ = self.create_encrypted_unencrypted_message_pair()
|
||||
# This test creates its own empty GPG home to test decryption failure
|
||||
empty_gpg_home = tempfile.mkdtemp()
|
||||
try:
|
||||
with pytest.raises(Exception):
|
||||
MailMessageDecryptor().run(encrypted_message)
|
||||
with override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
EMAIL_GNUPG_HOME=empty_gpg_home,
|
||||
):
|
||||
message_decryptor = MailMessageDecryptor()
|
||||
self.assertRaises(Exception, message_decryptor.run, encrypted_message)
|
||||
finally:
|
||||
_kill_gpg_agent(str(empty_gpg_home))
|
||||
# Clean up the temporary GPG home used only by this test
|
||||
try:
|
||||
subprocess.run(
|
||||
["gpgconf", "--kill", "gpg-agent"],
|
||||
env={"GNUPGHOME": empty_gpg_home},
|
||||
check=False,
|
||||
capture_output=True,
|
||||
timeout=5,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
pass
|
||||
shutil.rmtree(empty_gpg_home, ignore_errors=True)
|
||||
|
||||
def test_decrypt_encrypted_mail(
|
||||
self,
|
||||
gpg_settings: SettingsWrapper,
|
||||
encrypted_pair: tuple[MailMessage, MailMessage],
|
||||
) -> None:
|
||||
def test_decrypt_encrypted_mail(self) -> None:
|
||||
"""
|
||||
Creates a mail with attachments. Then encrypts it with a new key.
|
||||
Verifies that this encrypted message can be decrypted with attachments intact.
|
||||
"""
|
||||
encrypted_message, plaintext = encrypted_pair
|
||||
encrypted_message, message = self.create_encrypted_unencrypted_message_pair()
|
||||
headers = message.headers
|
||||
text = message.text
|
||||
|
||||
assert len(encrypted_message.attachments) == 1
|
||||
assert encrypted_message.attachments[0].filename == "encrypted.asc"
|
||||
assert encrypted_message.text == ""
|
||||
self.assertEqual(len(encrypted_message.attachments), 1)
|
||||
self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc")
|
||||
self.assertEqual(encrypted_message.text, "")
|
||||
|
||||
decryptor = MailMessageDecryptor()
|
||||
assert decryptor.able_to_run()
|
||||
decrypted = decryptor.run(encrypted_message)
|
||||
with override_settings(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR=True,
|
||||
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
|
||||
):
|
||||
message_decryptor = MailMessageDecryptor()
|
||||
self.assertTrue(message_decryptor.able_to_run())
|
||||
decrypted_message = message_decryptor.run(encrypted_message)
|
||||
|
||||
assert len(decrypted.attachments) == 2
|
||||
assert decrypted.attachments[0].filename == "f1.pdf"
|
||||
assert decrypted.attachments[1].filename == "f2.pdf"
|
||||
assert decrypted.headers == plaintext.headers
|
||||
assert decrypted.text == plaintext.text
|
||||
assert decrypted.uid == plaintext.uid
|
||||
self.assertEqual(len(decrypted_message.attachments), 2)
|
||||
self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf")
|
||||
self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf")
|
||||
self.assertEqual(decrypted_message.headers, headers)
|
||||
self.assertEqual(decrypted_message.text, text)
|
||||
self.assertEqual(decrypted_message.uid, message.uid)
|
||||
|
||||
def test_handle_encrypted_message(
|
||||
self,
|
||||
gpg_settings: SettingsWrapper,
|
||||
mail_mocker: MailMocker,
|
||||
message_encryptor: MessageEncryptor,
|
||||
) -> None:
|
||||
plaintext = mail_mocker.messageBuilder.create_message(
|
||||
def create_encrypted_unencrypted_message_pair(self):
|
||||
message = self.mailMocker.messageBuilder.create_message(
|
||||
body="Test message with 2 attachments",
|
||||
attachments=[
|
||||
_AttachmentDef(
|
||||
filename="f1.pdf",
|
||||
disposition="inline",
|
||||
),
|
||||
_AttachmentDef(filename="f2.pdf"),
|
||||
],
|
||||
)
|
||||
encrypted_message = self.messageEncryptor.encrypt(message)
|
||||
return encrypted_message, message
|
||||
|
||||
def test_handle_encrypted_message(self) -> None:
|
||||
message = self.mailMocker.messageBuilder.create_message(
|
||||
subject="the message title",
|
||||
from_="Myself",
|
||||
attachments=2,
|
||||
body="Test mail",
|
||||
)
|
||||
encrypted = message_encryptor.encrypt(plaintext)
|
||||
|
||||
rule = MailRule.objects.create(
|
||||
encrypted_message = self.messageEncryptor.encrypt(message)
|
||||
|
||||
account = MailAccountFactory()
|
||||
rule = MailRule(
|
||||
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
|
||||
consumption_scope=MailRule.ConsumptionScope.EVERYTHING,
|
||||
account=MailAccountFactory(),
|
||||
account=account,
|
||||
)
|
||||
rule.save()
|
||||
|
||||
result = MailAccountHandler()._handle_message(encrypted, rule)
|
||||
result = self.mail_account_handler._handle_message(encrypted_message, rule)
|
||||
|
||||
assert result == 3
|
||||
mail_mocker._queue_consumption_tasks_mock.assert_called()
|
||||
mail_mocker.assert_queue_consumption_tasks_call_args(
|
||||
self.assertEqual(result, 3)
|
||||
|
||||
self.mailMocker._queue_consumption_tasks_mock.assert_called()
|
||||
|
||||
self.mailMocker.assert_queue_consumption_tasks_call_args(
|
||||
[
|
||||
[
|
||||
{
|
||||
"override_title": plaintext.subject,
|
||||
"override_filename": f"{plaintext.subject}.eml",
|
||||
"override_title": message.subject,
|
||||
"override_filename": f"{message.subject}.eml",
|
||||
},
|
||||
],
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user