Compare commits

..

7 Commits

Author SHA1 Message Date
stumpylog a049174e86 Some little fixes around factories 2026-04-30 15:21:40 -07:00
stumpylog a204f7986c Simplifications and improvements 2026-04-30 15:18:22 -07:00
Trenton H 2c8e4bd822 test(mail): use pytest tmp_path for MessageEncryptor GPG homes 2026-04-30 15:18:22 -07:00
Trenton H 05b9ac0ddb test(mail): convert test_preprocessor.py to pytest style 2026-04-30 15:18:22 -07:00
Trenton H ff7be6371b test(mail): add mail_mocker fixture wrapping MailMocker helper 2026-04-30 15:18:22 -07:00
Trenton H 9e518671d3 test(mail): convert test_mail_oauth.py to pytest style 2026-04-30 15:18:22 -07:00
Trenton H fa39c60358 test(mail): add mail_user and oauth_settings pytest fixtures 2026-04-30 15:18:22 -07:00
8 changed files with 1066 additions and 1052 deletions
+15 -15
View File
@@ -11,17 +11,17 @@
},
"private": true,
"dependencies": {
"@angular/cdk": "^21.2.8",
"@angular/common": "~21.2.10",
"@angular/compiler": "~21.2.10",
"@angular/core": "~21.2.10",
"@angular/forms": "~21.2.10",
"@angular/localize": "~21.2.10",
"@angular/platform-browser": "~21.2.10",
"@angular/platform-browser-dynamic": "~21.2.10",
"@angular/router": "~21.2.10",
"@angular/cdk": "^21.2.4",
"@angular/common": "~21.2.6",
"@angular/compiler": "~21.2.6",
"@angular/core": "~21.2.6",
"@angular/forms": "~21.2.6",
"@angular/localize": "~21.2.6",
"@angular/platform-browser": "~21.2.6",
"@angular/platform-browser-dynamic": "~21.2.6",
"@angular/router": "~21.2.6",
"@ng-bootstrap/ng-bootstrap": "^20.0.0",
"@ng-select/ng-select": "^21.8.0",
"@ng-select/ng-select": "^21.7.0",
"@ngneat/dirty-check-forms": "^3.0.3",
"@popperjs/core": "^2.11.8",
"bootstrap": "^5.3.8",
@@ -42,16 +42,16 @@
"devDependencies": {
"@angular-builders/custom-webpack": "^21.0.3",
"@angular-builders/jest": "^21.0.3",
"@angular-devkit/core": "^21.2.8",
"@angular-devkit/schematics": "^21.2.8",
"@angular-devkit/core": "^21.2.6",
"@angular-devkit/schematics": "^21.2.6",
"@angular-eslint/builder": "21.3.1",
"@angular-eslint/eslint-plugin": "21.3.1",
"@angular-eslint/eslint-plugin-template": "21.3.1",
"@angular-eslint/schematics": "21.3.1",
"@angular-eslint/template-parser": "21.3.1",
"@angular/build": "^21.2.8",
"@angular/cli": "~21.2.8",
"@angular/compiler-cli": "~21.2.10",
"@angular/build": "^21.2.6",
"@angular/cli": "~21.2.6",
"@angular/compiler-cli": "~21.2.6",
"@codecov/webpack-plugin": "^1.9.1",
"@playwright/test": "^1.59.0",
"@types/jest": "^30.0.0",
+479 -561
View File
File diff suppressed because it is too large Load Diff
+3 -2
View File
@@ -12,6 +12,7 @@ from pytest_django.fixtures import SettingsWrapper
from rest_framework.test import APIClient
from documents.tests.factories import DocumentFactory
from documents.tests.factories import UserFactory
UserModelT = get_user_model()
@@ -129,9 +130,9 @@ def rest_api_client():
@pytest.fixture()
def regular_user(django_user_model: type[UserModelT]) -> UserModelT:
def regular_user(db) -> UserModelT:
"""Unprivileged authenticated user for permission boundary tests."""
return django_user_model.objects.create_user(username="regular", password="regular")
return UserFactory.create()
@pytest.fixture()
+1 -1
View File
@@ -35,7 +35,7 @@ def user_admin() -> PaperlessUserAdmin:
@pytest.fixture
def staff_user(db) -> User:
return UserFactory.create(username="staff", staff=True)
return UserFactory.create(staff=True)
class TestDocumentAdmin(DirectoriesMixin, TestCase):
+1 -1
View File
@@ -649,7 +649,7 @@ class TestDocumentConsumptionFinishedSignal:
assert doc_with_keyword.correspondent == correspondent
def test_correspondent_not_applied(self, doc_with_keyword: Document) -> None:
TagFactory.create(
CorrespondentFactory.create(
match="no-match",
matching_algorithm=MatchingModel.MATCH_ANY,
)
+48 -2
View File
@@ -1,13 +1,18 @@
from collections.abc import Generator
import pytest
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.test import Client
from pytest_django.fixtures import SettingsWrapper
from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailAccount
from paperless_mail.tests.factories import MailAccountFactory
from paperless_mail.tests.test_mail import MailMocker
@pytest.fixture()
@pytest.fixture
def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
"""
Create a mail account configured for local Greenmail server.
@@ -24,6 +29,47 @@ def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
account.delete()
@pytest.fixture()
@pytest.fixture
def mail_account_handler() -> MailAccountHandler:
return MailAccountHandler()
@pytest.fixture
def mail_user(db: None, django_user_model, client: Client) -> User:
"""
Create a user with the `add_mailaccount` permission and log them in via
the test client. Returned so tests can mutate permissions if needed.
"""
user = django_user_model.objects.create_user("testuser")
user.user_permissions.add(*Permission.objects.filter(codename="add_mailaccount"))
client.force_login(user)
return user
@pytest.fixture
def oauth_settings(settings: SettingsWrapper) -> SettingsWrapper:
"""
Apply the OAuth callback / client-id settings the OAuth flow needs. Uses
pytest-django's `settings` fixture so values are reverted automatically.
"""
settings.OAUTH_CALLBACK_BASE_URL = "http://localhost:8000"
settings.GMAIL_OAUTH_CLIENT_ID = "test_gmail_client_id"
settings.GMAIL_OAUTH_CLIENT_SECRET = "test_gmail_client_secret"
settings.OUTLOOK_OAUTH_CLIENT_ID = "test_outlook_client_id"
settings.OUTLOOK_OAUTH_CLIENT_SECRET = "test_outlook_client_secret"
return settings
@pytest.fixture
def mail_mocker(db: None) -> Generator[MailMocker, None, None]:
"""
Provides a MailMocker instance with its `MailBox` and
`queue_consumption_tasks` patches active. Cleanups registered via
TestCase.addCleanup are run on teardown by calling doCleanups().
"""
mocker = MailMocker()
mocker.setUp()
try:
yield mocker
finally:
mocker.doCleanups()
+310 -296
View File
@@ -1,13 +1,14 @@
from datetime import timedelta
from unittest import mock
import pytest
import pytest_mock
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.test import TestCase
from django.test import override_settings
from django.test import Client
from django.utils import timezone
from httpx_oauth.oauth2 import GetAccessTokenError
from httpx_oauth.oauth2 import RefreshTokenError
from pytest_django.fixtures import SettingsWrapper
from rest_framework import status
from paperless_mail.mail import MailAccountHandler
@@ -16,358 +17,371 @@ from paperless_mail.oauth import PaperlessMailOAuth2Manager
from paperless_mail.tests.factories import MailAccountFactory
@override_settings(
OAUTH_CALLBACK_BASE_URL="http://localhost:8000",
GMAIL_OAUTH_CLIENT_ID="test_gmail_client_id",
GMAIL_OAUTH_CLIENT_SECRET="test_gmail_client_secret",
OUTLOOK_OAUTH_CLIENT_ID="test_outlook_client_id",
OUTLOOK_OAUTH_CLIENT_SECRET="test_outlook_client_secret",
)
class TestMailOAuth(
TestCase,
):
def setUp(self) -> None:
self.user = User.objects.create_user("testuser")
self.user.user_permissions.add(
*Permission.objects.filter(
codename__in=[
"add_mailaccount",
],
@pytest.fixture
def oauth_manager() -> PaperlessMailOAuth2Manager:
return PaperlessMailOAuth2Manager()
@pytest.fixture
def oauth_session(client: Client) -> Client:
"""Seed the test client session with a known oauth_state."""
session = client.session
session.update({"oauth_state": "test_state"})
session.save()
return client
class TestOAuthUrlGeneration:
"""OAuth callback / redirect URL construction by PaperlessMailOAuth2Manager."""
@pytest.mark.parametrize(
("overrides", "expected"),
[
pytest.param(
{"OAUTH_CALLBACK_BASE_URL": "http://paperless.example.com"},
"http://paperless.example.com/api/oauth/callback/",
id="callback-base-url-set",
),
)
self.user.save()
self.client.force_login(self.user)
self.mail_account_handler = MailAccountHandler()
super().setUp()
def test_generate_paths(self) -> None:
"""
GIVEN:
- Mocked settings for OAuth callback and base URLs
WHEN:
- get_oauth_callback_url and get_oauth_redirect_url are called
THEN:
- Correct URLs are generated
"""
# Callback URL
oauth_manager = PaperlessMailOAuth2Manager()
with override_settings(OAUTH_CALLBACK_BASE_URL="http://paperless.example.com"):
self.assertEqual(
oauth_manager.oauth_callback_url,
pytest.param(
{
"OAUTH_CALLBACK_BASE_URL": None,
"PAPERLESS_URL": "http://paperless.example.com",
},
"http://paperless.example.com/api/oauth/callback/",
)
with override_settings(
OAUTH_CALLBACK_BASE_URL=None,
PAPERLESS_URL="http://paperless.example.com",
):
self.assertEqual(
oauth_manager.oauth_callback_url,
"http://paperless.example.com/api/oauth/callback/",
)
with override_settings(
OAUTH_CALLBACK_BASE_URL=None,
PAPERLESS_URL="http://paperless.example.com",
BASE_URL="/paperless/",
):
self.assertEqual(
oauth_manager.oauth_callback_url,
id="falls-back-to-paperless-url",
),
pytest.param(
{
"OAUTH_CALLBACK_BASE_URL": None,
"PAPERLESS_URL": "http://paperless.example.com",
"BASE_URL": "/paperless/",
},
"http://paperless.example.com/paperless/api/oauth/callback/",
)
# Redirect URL
with override_settings(DEBUG=True):
self.assertEqual(
oauth_manager.oauth_redirect_url,
"http://localhost:4200/mail",
)
with override_settings(DEBUG=False):
self.assertEqual(
oauth_manager.oauth_redirect_url,
"/mail",
)
@mock.patch(
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_gmail_access_token",
id="respects-base-url-prefix",
),
],
)
@mock.patch(
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_outlook_access_token",
)
def test_oauth_callback_view_success(
def test_oauth_callback_url(
self,
mock_get_outlook_access_token,
mock_get_gmail_access_token,
settings: SettingsWrapper,
oauth_manager: PaperlessMailOAuth2Manager,
overrides: dict,
expected: str,
) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- Various combinations of OAUTH_CALLBACK_BASE_URL, PAPERLESS_URL, and BASE_URL
WHEN:
- OAuth callback is called with a code and scope
- OAuth callback is called with a code and no scope
- oauth_callback_url is read from the manager
THEN:
- Gmail mail account is created
- Outlook mail account is created
- The expected fully-qualified callback URL is produced
"""
for key, value in overrides.items():
setattr(settings, key, value)
assert oauth_manager.oauth_callback_url == expected
mock_get_gmail_access_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
mock_get_outlook_access_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
session = self.client.session
session.update(
{
"oauth_state": "test_state",
},
)
session.save()
# Test Google OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=1", response.url)
mock_get_gmail_access_token.assert_called_once()
self.assertTrue(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
# Test Outlook OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=1", response.url)
self.assertTrue(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
@mock.patch("httpx_oauth.oauth2.BaseOAuth2.get_access_token")
def test_oauth_callback_view_fails(self, mock_get_access_token) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
WHEN:
- OAuth callback is called and get access token returns an error
THEN:
- No mail account is created
- Error is logged
"""
mock_get_access_token.side_effect = GetAccessTokenError("test_error")
session = self.client.session
session.update(
{
"oauth_state": "test_state",
},
)
session.save()
with self.assertLogs("paperless_mail", level="ERROR") as cm:
# Test Google OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=0", response.url)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
# Test Outlook OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=0", response.url)
self.assertFalse(
MailAccount.objects.filter(
imap_server="outlook.office365.com",
).exists(),
)
self.assertIn(
"Error getting access token from OAuth provider",
cm.output[0],
)
def test_oauth_callback_view_insufficient_permissions(self) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- User without add_mailaccount permission
WHEN:
- OAuth callback is called
THEN:
- 400 bad request returned, no mail accounts are created
"""
self.user.user_permissions.remove(
*Permission.objects.filter(
codename__in=[
"add_mailaccount",
],
@pytest.mark.parametrize(
("debug", "expected"),
[
pytest.param(
True,
"http://localhost:4200/mail",
id="debug-redirects-to-ng-dev",
),
)
self.user.save()
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
def test_oauth_callback_view_no_code(self) -> None:
pytest.param(False, "/mail", id="prod-redirects-to-relative-path"),
],
)
def test_oauth_redirect_url(
self,
settings: SettingsWrapper,
oauth_manager: PaperlessMailOAuth2Manager,
debug: bool, # noqa: FBT001
expected: str,
) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- DEBUG is toggled on or off
WHEN:
- OAuth callback is called without a code
- oauth_redirect_url is read from the manager
THEN:
- 400 bad request returned, no mail accounts are created
- In DEBUG mode the Angular dev server URL is returned, otherwise a relative path
"""
settings.DEBUG = debug
assert oauth_manager.oauth_redirect_url == expected
response = self.client.get(
"/api/oauth/callback/",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
def test_oauth_callback_view_invalid_state(self) -> None:
@pytest.mark.django_db
class TestOAuthCallbackView:
"""End-to-end behavior of the /api/oauth/callback/ endpoint."""
def test_no_code(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- OAuth client IDs and secrets configured
WHEN:
- OAuth callback is called with an invalid state
- The OAuth callback is called without a code parameter
THEN:
- 400 bad request returned, no mail accounts are created
- 400 Bad Request is returned and no mail account is created
"""
response = client.get("/api/oauth/callback/")
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.exists()
response = self.client.get(
def test_invalid_state(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured
WHEN:
- The OAuth callback is called with a state that does not match the session
THEN:
- 400 Bad Request is returned and no mail account is created
"""
response = client.get(
"/api/oauth/callback/?code=test_code&state=invalid_state",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.exists()
@mock.patch("paperless_mail.mail.get_mailbox")
@mock.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
)
def test_refresh_token_on_handle_mail_account(
def test_insufficient_permissions(
self,
mock_refresh_token,
mock_get_mailbox,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- Mail account with refresh token and expiration
- OAuth client IDs and secrets configured
- User without add_mailaccount permission
WHEN:
- handle_mail_account is called
- The OAuth callback is called
THEN:
- Refresh token is called
- 400 Bad Request is returned and no mail account is created
"""
mock_mailbox = mock.MagicMock()
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
mail_account = MailAccountFactory(
name="Test Gmail Mail Account",
username="test_username",
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
is_token=True,
refresh_token="test_refresh_token",
expiration=timezone.now() - timedelta(days=1),
mail_user.user_permissions.remove(
*Permission.objects.filter(codename="add_mailaccount"),
)
mock_refresh_token.return_value = {
response = client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.exists()
@pytest.mark.parametrize(
("provider", "callback_query", "expected_imap"),
[
pytest.param(
"gmail",
"code=test_code&scope=https://mail.google.com/&state=test_state",
"imap.gmail.com",
id="gmail",
),
pytest.param(
"outlook",
"code=test_code&state=test_state",
"outlook.office365.com",
id="outlook",
),
],
)
def test_success(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
oauth_session: Client,
mocker: pytest_mock.MockerFixture,
provider: str,
callback_query: str,
expected_imap: str,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured for Gmail and Outlook
- A valid oauth_state seeded in the session
WHEN:
- The OAuth callback is called with a code and provider-specific scope
THEN:
- A redirect with oauth_success=1 is returned
- The provider's access-token method is invoked
- A mail account for the matching provider is created
"""
token_payload = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
self.mail_account_handler.handle_mail_account(mail_account)
mock_refresh_token.assert_called_once()
mock_refresh_token.reset_mock()
mock_refresh_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh",
"expires_in": 3600,
}
outlook_mail_account = MailAccountFactory(
name="Test Outlook Mail Account",
username="test_username",
account_type=MailAccount.MailAccountType.OUTLOOK_OAUTH,
is_token=True,
refresh_token="test_refresh_token",
expiration=timezone.now() - timedelta(days=1),
mocked = mocker.patch(
f"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_{provider}_access_token",
return_value=token_payload,
)
self.mail_account_handler.handle_mail_account(outlook_mail_account)
mock_refresh_token.assert_called_once()
response = client.get(f"/api/oauth/callback/?{callback_query}")
@mock.patch("paperless_mail.mail.get_mailbox")
@mock.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
assert response.status_code == status.HTTP_302_FOUND
assert "oauth_success=1" in response.url
mocked.assert_called_once()
assert MailAccount.objects.filter(imap_server=expected_imap).exists()
@pytest.mark.parametrize(
("callback_query", "imap_server"),
[
pytest.param(
"code=test_code&scope=https://mail.google.com/&state=test_state",
"imap.gmail.com",
id="gmail",
),
pytest.param(
"code=test_code&state=test_state",
"outlook.office365.com",
id="outlook",
),
],
)
def test_refresh_token_on_handle_mail_account_fails(
def test_provider_error(
self,
mock_refresh_token,
mock_get_mailbox,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
oauth_session: Client,
mocker: pytest_mock.MockerFixture,
caplog: pytest.LogCaptureFixture,
callback_query: str,
imap_server: str,
) -> None:
"""
GIVEN:
- Mail account with refresh token and expiration
- OAuth client IDs and secrets configured
- The provider's access-token endpoint raises GetAccessTokenError
WHEN:
- handle_mail_account is called
- Refresh token is called but fails
- The OAuth callback is called with a code (Gmail or Outlook)
THEN:
- Error is logged
- 0 processed mails is returned
- A redirect with oauth_success=0 is returned
- No mail account is created
- The failure is logged at ERROR level
"""
mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.get_access_token",
side_effect=GetAccessTokenError("test_error"),
)
mock_mailbox = mock.MagicMock()
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
with caplog.at_level("ERROR", logger="paperless_mail"):
response = client.get(f"/api/oauth/callback/?{callback_query}")
mail_account = MailAccountFactory(
name="Test Gmail Mail Account",
assert response.status_code == status.HTTP_302_FOUND
assert "oauth_success=0" in response.url
assert not MailAccount.objects.filter(imap_server=imap_server).exists()
assert any(
"Error getting access token from OAuth provider" in record.message
for record in caplog.records
)
@pytest.fixture
def expired_oauth_account_factory(db: None, mocker: pytest_mock.MockerFixture):
"""
Build an OAuth-backed MailAccount whose access token has already expired,
while patching `get_mailbox` so no real IMAP connection is attempted.
"""
mocker.patch(
"paperless_mail.mail.get_mailbox",
).return_value.__enter__.return_value = mocker.MagicMock()
def _make(account_type: MailAccount.MailAccountType) -> MailAccount:
return MailAccountFactory(
username="test_username",
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
account_type=account_type,
is_token=True,
refresh_token="test_refresh_token",
expiration=timezone.now() - timedelta(days=1),
)
mock_refresh_token.side_effect = RefreshTokenError("test_error")
return _make
with self.assertLogs("paperless_mail", level="ERROR") as cm:
# returns 0 processed mails
self.assertEqual(
self.mail_account_handler.handle_mail_account(mail_account),
0,
)
mock_refresh_token.assert_called_once()
self.assertIn(
f"Failed to refresh oauth token for account {mail_account}: test_error",
cm.output[0],
)
@pytest.mark.django_db
class TestRefreshTokenOnHandleMailAccount:
"""OAuth refresh-token flow exercised through MailAccountHandler.handle_mail_account."""
@pytest.mark.parametrize(
"account_type",
[
pytest.param(MailAccount.MailAccountType.GMAIL_OAUTH, id="gmail"),
pytest.param(MailAccount.MailAccountType.OUTLOOK_OAUTH, id="outlook"),
],
)
def test_refresh_token_called(
self,
mocker: pytest_mock.MockerFixture,
mail_account_handler: MailAccountHandler,
expired_oauth_account_factory,
account_type: MailAccount.MailAccountType,
) -> None:
"""
GIVEN:
- An OAuth-backed mail account with a refresh token and an expired access token
WHEN:
- handle_mail_account is called
THEN:
- The OAuth refresh_token endpoint is invoked exactly once
"""
mock_refresh = mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
return_value={
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
},
)
mail_account_handler.handle_mail_account(
expired_oauth_account_factory(account_type),
)
mock_refresh.assert_called_once()
def test_refresh_token_failure(
self,
mocker: pytest_mock.MockerFixture,
caplog: pytest.LogCaptureFixture,
mail_account_handler: MailAccountHandler,
expired_oauth_account_factory,
) -> None:
"""
GIVEN:
- An OAuth-backed mail account with a refresh token and an expired access token
- The OAuth refresh_token endpoint raises RefreshTokenError
WHEN:
- handle_mail_account is called
THEN:
- 0 processed mails is returned
- The failure is logged at ERROR level with the account context
"""
mock_refresh = mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
side_effect=RefreshTokenError("test_error"),
)
account = expired_oauth_account_factory(
MailAccount.MailAccountType.GMAIL_OAUTH,
)
with caplog.at_level("ERROR", logger="paperless_mail"):
result = mail_account_handler.handle_mail_account(account)
assert result == 0
mock_refresh.assert_called_once()
assert any(
f"Failed to refresh oauth token for account {account}: test_error"
in record.message
for record in caplog.records
)
+209 -174
View File
@@ -1,64 +1,68 @@
import email
import email.contentmanager
import shutil
import subprocess
import tempfile
from collections.abc import Generator
from email.message import Message
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from unittest import mock
from pathlib import Path
import gnupg
from django.test import override_settings
import pytest
from imap_tools import MailMessage
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture
from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailRule
from paperless_mail.preprocessor import MailMessageDecryptor
from paperless_mail.tests.factories import MailAccountFactory
from paperless_mail.tests.test_mail import TestMail
from paperless_mail.tests.test_mail import MailMocker
from paperless_mail.tests.test_mail import _AttachmentDef
class MessageEncryptor:
def __init__(self) -> None:
self.gpg_home = tempfile.mkdtemp()
self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
self._testUser = "testuser@example.com"
# Generate a new key
input_data = self.gpg.gen_key_input(
name_email=self._testUser,
passphrase=None,
key_type="RSA",
key_length=2048,
expire_date=0,
no_protection=True,
def _kill_gpg_agent(gpg_home: str) -> None:
"""
Terminate any gpg-agent attached to `gpg_home` so the directory is
removable. Uses `gpgconf --kill`, the GnuPG project's recommended cleanup
path; python-gnupg has no built-in cleanup since it only wraps the CLI.
"""
try:
subprocess.run(
["gpgconf", "--kill", "gpg-agent"],
env={"GNUPGHOME": gpg_home},
check=False,
capture_output=True,
timeout=5,
)
self.gpg.gen_key(input_data)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
def cleanup(self) -> None:
"""
Kill the gpg-agent process and clean up the temporary GPG home directory.
This uses gpgconf to properly terminate the agent, which is the officially
recommended cleanup method from the GnuPG project. python-gnupg does not
provide built-in cleanup methods as it's only a wrapper around the gpg CLI.
"""
# Kill the gpg-agent using the official GnuPG cleanup tool
try:
subprocess.run(
["gpgconf", "--kill", "gpg-agent"],
env={"GNUPGHOME": self.gpg_home},
check=False,
capture_output=True,
timeout=5,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
# gpgconf not found or hung - agent will timeout eventually
pass
class MessageEncryptor:
"""
Test helper: generates a throwaway GPG keypair in a tempdir and exposes
`encrypt(MailMessage) -> MailMessage`.
"""
# Clean up the temporary directory
shutil.rmtree(self.gpg_home, ignore_errors=True)
TEST_USER = "testuser@example.com"
def __init__(self, gpg_home: Path) -> None:
self.gpg_home = str(gpg_home)
self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
self.gpg.gen_key(
self.gpg.gen_key_input(
name_email=self.TEST_USER,
passphrase=None,
key_type="RSA",
key_length=2048,
expire_date=0,
no_protection=True,
),
)
def kill_agent(self) -> None:
_kill_gpg_agent(self.gpg_home)
@staticmethod
def get_email_body_without_headers(email_message: Message) -> bytes:
@@ -66,7 +70,6 @@ class MessageEncryptor:
Filters some relevant headers from an EmailMessage and returns just the body.
"""
message_copy = email.message_from_bytes(email_message.as_bytes())
message_copy._headers = [
header
for header in message_copy._headers
@@ -74,203 +77,235 @@ class MessageEncryptor:
]
return message_copy.as_bytes()
def encrypt(self, message):
original_email: email.message.Message = message.obj
def encrypt(self, message: MailMessage) -> MailMessage:
original_email: Message = message.obj
encrypted_data = self.gpg.encrypt(
self.get_email_body_without_headers(original_email),
self._testUser,
self.TEST_USER,
armor=True,
)
if not encrypted_data.ok:
raise Exception(f"Encryption failed: {encrypted_data.stderr}")
encrypted_email_content = encrypted_data.data
new_email = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")
new_email["From"] = original_email["From"]
new_email["To"] = original_email["To"]
new_email["Subject"] = original_email["Subject"]
# Add the control part
control_part = MIMEApplication(_data=b"", _subtype="pgp-encrypted")
control_part.set_payload("Version: 1")
new_email.attach(control_part)
# Add the encrypted data part
encrypted_part = MIMEApplication(_data=b"", _subtype="octet-stream")
encrypted_part.set_payload(encrypted_email_content.decode("ascii"))
encrypted_part.set_payload(encrypted_data.data.decode("ascii"))
encrypted_part.add_header(
"Content-Disposition",
'attachment; filename="encrypted.asc"',
)
new_email.attach(encrypted_part)
encrypted_message: MailMessage = MailMessage(
return MailMessage(
[(f"UID {message.uid}".encode(), new_email.as_bytes())],
)
return encrypted_message
class TestMailMessageGpgDecryptor(TestMail):
@classmethod
def setUpClass(cls) -> None:
"""Create GPG encryptor once for all tests in this class."""
super().setUpClass()
cls.messageEncryptor = MessageEncryptor()
@pytest.fixture(scope="session")
def message_encryptor(
tmp_path_factory: pytest.TempPathFactory,
) -> Generator[MessageEncryptor, None, None]:
"""
Session-scoped: GPG keypair generation is slow (~1s+), and nothing in
these tests mutates the keyring after creation. The GPG home directory
comes from `tmp_path_factory` so pytest cleans it up at session end;
we still kill the gpg-agent ourselves so the dir is removable.
"""
encryptor = MessageEncryptor(tmp_path_factory.mktemp("gpg-home"))
yield encryptor
encryptor.kill_agent()
@classmethod
def tearDownClass(cls) -> None:
"""Clean up GPG resources after all tests complete."""
if hasattr(cls, "messageEncryptor"):
cls.messageEncryptor.cleanup()
super().tearDownClass()
def setUp(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
super().setUp()
@pytest.fixture
def gpg_settings(
settings: SettingsWrapper,
message_encryptor: MessageEncryptor,
) -> SettingsWrapper:
settings.EMAIL_GNUPG_HOME = message_encryptor.gpg_home
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
return settings
def test_preprocessor_is_able_to_run(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
def test_preprocessor_is_able_to_run2(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=None,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
@pytest.fixture
def encrypted_pair(
mail_mocker: MailMocker,
message_encryptor: MessageEncryptor,
) -> tuple[MailMessage, MailMessage]:
"""
Build a (encrypted, plaintext) MailMessage pair sharing the same UID and
headers, with two PDF attachments on the plaintext side.
"""
plaintext = mail_mocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(filename="f1.pdf", disposition="inline"),
_AttachmentDef(filename="f2.pdf"),
],
)
return message_encryptor.encrypt(plaintext), plaintext
def test_is_not_able_to_run_disabled(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=False,
):
self.assertFalse(MailMessageDecryptor.able_to_run())
def test_is_not_able_to_run_bogus_path(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME="_)@# notapath &%#$",
):
self.assertFalse(MailMessageDecryptor.able_to_run())
# Sentinel used in `test_able_to_run` parametrization to request the real
# GPG home from the session-scoped `message_encryptor` fixture at runtime.
_VALID_GPG_HOME = object()
def test_fails_at_initialization(self) -> None:
with (
mock.patch("gnupg.GPG.__init__") as mock_run,
override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
class TestMailMessageDecryptorAbleToRun:
"""`MailMessageDecryptor.able_to_run()` configuration matrix."""
@pytest.mark.parametrize(
("settings_overrides", "expected"),
[
pytest.param(
{
"EMAIL_GNUPG_HOME": _VALID_GPG_HOME,
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
},
True,
id="enabled-with-valid-home",
),
):
pytest.param(
{"EMAIL_GNUPG_HOME": None, "EMAIL_ENABLE_GPG_DECRYPTOR": True},
True,
id="enabled-with-default-home",
),
pytest.param(
{"EMAIL_ENABLE_GPG_DECRYPTOR": False},
False,
id="disabled",
),
pytest.param(
{
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
"EMAIL_GNUPG_HOME": "_)@# notapath &%#$",
},
False,
id="enabled-with-bogus-path",
),
],
)
def test_able_to_run(
self,
settings: SettingsWrapper,
message_encryptor: MessageEncryptor,
settings_overrides: dict,
*,
expected: bool,
) -> None:
for key, value in settings_overrides.items():
if value is _VALID_GPG_HOME:
value = message_encryptor.gpg_home
setattr(settings, key, value)
assert MailMessageDecryptor.able_to_run() is expected
def side_effect(*args, **kwargs):
raise OSError("Cannot find 'gpg' binary")
mock_run.side_effect = side_effect
@pytest.mark.django_db
class TestMailMessageDecryptor:
"""End-to-end decrypt and consumption flow with a real GPG keyring."""
handler = MailAccountHandler()
self.assertEqual(len(handler._message_preprocessors), 0)
def test_fails_at_initialization(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
mocker.patch(
"gnupg.GPG.__init__",
side_effect=OSError("Cannot find 'gpg' binary"),
)
handler = MailAccountHandler()
assert len(handler._message_preprocessors) == 0
def test_decrypt_fails(
self,
settings: SettingsWrapper,
encrypted_pair: tuple[MailMessage, MailMessage],
tmp_path: Path,
) -> None:
"""
A decryptor pointed at a fresh empty GPG home cannot decrypt the
message — ensure it surfaces an exception rather than silently passing
bytes through.
"""
encrypted_message, _ = encrypted_pair
empty_gpg_home = tmp_path / "empty-gpg-home"
empty_gpg_home.mkdir()
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
settings.EMAIL_GNUPG_HOME = str(empty_gpg_home)
def test_decrypt_fails(self) -> None:
encrypted_message, _ = self.create_encrypted_unencrypted_message_pair()
# This test creates its own empty GPG home to test decryption failure
empty_gpg_home = tempfile.mkdtemp()
try:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME=empty_gpg_home,
):
message_decryptor = MailMessageDecryptor()
self.assertRaises(Exception, message_decryptor.run, encrypted_message)
with pytest.raises(Exception):
MailMessageDecryptor().run(encrypted_message)
finally:
# Clean up the temporary GPG home used only by this test
try:
subprocess.run(
["gpgconf", "--kill", "gpg-agent"],
env={"GNUPGHOME": empty_gpg_home},
check=False,
capture_output=True,
timeout=5,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
shutil.rmtree(empty_gpg_home, ignore_errors=True)
_kill_gpg_agent(str(empty_gpg_home))
def test_decrypt_encrypted_mail(self) -> None:
def test_decrypt_encrypted_mail(
self,
gpg_settings: SettingsWrapper,
encrypted_pair: tuple[MailMessage, MailMessage],
) -> None:
"""
Creates a mail with attachments. Then encrypts it with a new key.
Verifies that this encrypted message can be decrypted with attachments intact.
"""
encrypted_message, message = self.create_encrypted_unencrypted_message_pair()
headers = message.headers
text = message.text
encrypted_message, plaintext = encrypted_pair
self.assertEqual(len(encrypted_message.attachments), 1)
self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc")
self.assertEqual(encrypted_message.text, "")
assert len(encrypted_message.attachments) == 1
assert encrypted_message.attachments[0].filename == "encrypted.asc"
assert encrypted_message.text == ""
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
):
message_decryptor = MailMessageDecryptor()
self.assertTrue(message_decryptor.able_to_run())
decrypted_message = message_decryptor.run(encrypted_message)
decryptor = MailMessageDecryptor()
assert decryptor.able_to_run()
decrypted = decryptor.run(encrypted_message)
self.assertEqual(len(decrypted_message.attachments), 2)
self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf")
self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf")
self.assertEqual(decrypted_message.headers, headers)
self.assertEqual(decrypted_message.text, text)
self.assertEqual(decrypted_message.uid, message.uid)
assert len(decrypted.attachments) == 2
assert decrypted.attachments[0].filename == "f1.pdf"
assert decrypted.attachments[1].filename == "f2.pdf"
assert decrypted.headers == plaintext.headers
assert decrypted.text == plaintext.text
assert decrypted.uid == plaintext.uid
def create_encrypted_unencrypted_message_pair(self):
message = self.mailMocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(
filename="f1.pdf",
disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
encrypted_message = self.messageEncryptor.encrypt(message)
return encrypted_message, message
def test_handle_encrypted_message(self) -> None:
message = self.mailMocker.messageBuilder.create_message(
def test_handle_encrypted_message(
self,
gpg_settings: SettingsWrapper,
mail_mocker: MailMocker,
message_encryptor: MessageEncryptor,
) -> None:
plaintext = mail_mocker.messageBuilder.create_message(
subject="the message title",
from_="Myself",
attachments=2,
body="Test mail",
)
encrypted = message_encryptor.encrypt(plaintext)
encrypted_message = self.messageEncryptor.encrypt(message)
account = MailAccountFactory()
rule = MailRule(
rule = MailRule.objects.create(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
consumption_scope=MailRule.ConsumptionScope.EVERYTHING,
account=account,
account=MailAccountFactory(),
)
rule.save()
result = self.mail_account_handler._handle_message(encrypted_message, rule)
result = MailAccountHandler()._handle_message(encrypted, rule)
self.assertEqual(result, 3)
self.mailMocker._queue_consumption_tasks_mock.assert_called()
self.mailMocker.assert_queue_consumption_tasks_call_args(
assert result == 3
mail_mocker._queue_consumption_tasks_mock.assert_called()
mail_mocker.assert_queue_consumption_tasks_call_args(
[
[
{
"override_title": message.subject,
"override_filename": f"{message.subject}.eml",
"override_title": plaintext.subject,
"override_filename": f"{plaintext.subject}.eml",
},
],
[