Compare commits

..

1 Commits

Author SHA1 Message Date
Trenton Holmes 1307b66993 Upgrades uv to the 0.11.x branch 2026-05-02 12:46:15 -07:00
11 changed files with 490 additions and 583 deletions
+1 -1
View File
@@ -11,7 +11,7 @@ concurrency:
group: backend-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
env:
DEFAULT_UV_VERSION: "0.10.x"
DEFAULT_UV_VERSION: "0.11.x"
NLTK_DATA: "/usr/share/nltk_data"
permissions: {}
jobs:
+1 -1
View File
@@ -11,7 +11,7 @@ concurrency:
permissions:
contents: read
env:
DEFAULT_UV_VERSION: "0.10.x"
DEFAULT_UV_VERSION: "0.11.x"
DEFAULT_PYTHON_VERSION: "3.12"
jobs:
changes:
+1 -1
View File
@@ -8,7 +8,7 @@ concurrency:
group: release-${{ github.ref }}
cancel-in-progress: false
env:
DEFAULT_UV_VERSION: "0.10.x"
DEFAULT_UV_VERSION: "0.11.x"
DEFAULT_PYTHON_VERSION: "3.12"
permissions: {}
jobs:
+3
View File
@@ -3,6 +3,8 @@ on:
push:
branches:
- dev
env:
DEFAULT_UV_VERSION: "0.11.x"
jobs:
generate-translate-strings:
name: Generate Translation Strings
@@ -29,6 +31,7 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
version: ${{ env.DEFAULT_UV_VERSION }}
enable-cache: true
- name: Install backend python dependencies
run: |
+1 -1
View File
@@ -30,7 +30,7 @@ RUN set -eux \
# Purpose: Installs s6-overlay and rootfs
# Comments:
# - Don't leave anything extra in here either
FROM ghcr.io/astral-sh/uv:0.10.9-python3.12-trixie-slim AS s6-overlay-base
FROM ghcr.io/astral-sh/uv:0.11.6-python3.12-trixie-slim AS s6-overlay-base
WORKDIR /usr/src/s6
+2 -3
View File
@@ -12,7 +12,6 @@ from pytest_django.fixtures import SettingsWrapper
from rest_framework.test import APIClient
from documents.tests.factories import DocumentFactory
from documents.tests.factories import UserFactory
UserModelT = get_user_model()
@@ -130,9 +129,9 @@ def rest_api_client():
@pytest.fixture()
def regular_user(db) -> UserModelT:
def regular_user(django_user_model: type[UserModelT]) -> UserModelT:
"""Unprivileged authenticated user for permission boundary tests."""
return UserFactory.create()
return django_user_model.objects.create_user(username="regular", password="regular")
@pytest.fixture()
+1 -1
View File
@@ -35,7 +35,7 @@ def user_admin() -> PaperlessUserAdmin:
@pytest.fixture
def staff_user(db) -> User:
return UserFactory.create(staff=True)
return UserFactory.create(username="staff", staff=True)
class TestDocumentAdmin(DirectoriesMixin, TestCase):
+1 -1
View File
@@ -649,7 +649,7 @@ class TestDocumentConsumptionFinishedSignal:
assert doc_with_keyword.correspondent == correspondent
def test_correspondent_not_applied(self, doc_with_keyword: Document) -> None:
CorrespondentFactory.create(
TagFactory.create(
match="no-match",
matching_algorithm=MatchingModel.MATCH_ANY,
)
+2 -48
View File
@@ -1,18 +1,13 @@
from collections.abc import Generator
import pytest
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.test import Client
from pytest_django.fixtures import SettingsWrapper
from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailAccount
from paperless_mail.tests.factories import MailAccountFactory
from paperless_mail.tests.test_mail import MailMocker
@pytest.fixture
@pytest.fixture()
def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
"""
Create a mail account configured for local Greenmail server.
@@ -29,47 +24,6 @@ def greenmail_mail_account(db: None) -> Generator[MailAccount, None, None]:
account.delete()
@pytest.fixture
@pytest.fixture()
def mail_account_handler() -> MailAccountHandler:
return MailAccountHandler()
@pytest.fixture
def mail_user(db: None, django_user_model, client: Client) -> User:
"""
Create a user with the `add_mailaccount` permission and log them in via
the test client. Returned so tests can mutate permissions if needed.
"""
user = django_user_model.objects.create_user("testuser")
user.user_permissions.add(*Permission.objects.filter(codename="add_mailaccount"))
client.force_login(user)
return user
@pytest.fixture
def oauth_settings(settings: SettingsWrapper) -> SettingsWrapper:
"""
Apply the OAuth callback / client-id settings the OAuth flow needs. Uses
pytest-django's `settings` fixture so values are reverted automatically.
"""
settings.OAUTH_CALLBACK_BASE_URL = "http://localhost:8000"
settings.GMAIL_OAUTH_CLIENT_ID = "test_gmail_client_id"
settings.GMAIL_OAUTH_CLIENT_SECRET = "test_gmail_client_secret"
settings.OUTLOOK_OAUTH_CLIENT_ID = "test_outlook_client_id"
settings.OUTLOOK_OAUTH_CLIENT_SECRET = "test_outlook_client_secret"
return settings
@pytest.fixture
def mail_mocker(db: None) -> Generator[MailMocker, None, None]:
"""
Provides a MailMocker instance with its `MailBox` and
`queue_consumption_tasks` patches active. Cleanups registered via
TestCase.addCleanup are run on teardown by calling doCleanups().
"""
mocker = MailMocker()
mocker.setUp()
try:
yield mocker
finally:
mocker.doCleanups()
+303 -317
View File
@@ -1,14 +1,13 @@
from datetime import timedelta
from unittest import mock
import pytest
import pytest_mock
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.test import Client
from django.test import TestCase
from django.test import override_settings
from django.utils import timezone
from httpx_oauth.oauth2 import GetAccessTokenError
from httpx_oauth.oauth2 import RefreshTokenError
from pytest_django.fixtures import SettingsWrapper
from rest_framework import status
from paperless_mail.mail import MailAccountHandler
@@ -17,371 +16,358 @@ from paperless_mail.oauth import PaperlessMailOAuth2Manager
from paperless_mail.tests.factories import MailAccountFactory
@pytest.fixture
def oauth_manager() -> PaperlessMailOAuth2Manager:
return PaperlessMailOAuth2Manager()
@pytest.fixture
def oauth_session(client: Client) -> Client:
"""Seed the test client session with a known oauth_state."""
session = client.session
session.update({"oauth_state": "test_state"})
session.save()
return client
class TestOAuthUrlGeneration:
"""OAuth callback / redirect URL construction by PaperlessMailOAuth2Manager."""
@pytest.mark.parametrize(
("overrides", "expected"),
[
pytest.param(
{"OAUTH_CALLBACK_BASE_URL": "http://paperless.example.com"},
"http://paperless.example.com/api/oauth/callback/",
id="callback-base-url-set",
@override_settings(
OAUTH_CALLBACK_BASE_URL="http://localhost:8000",
GMAIL_OAUTH_CLIENT_ID="test_gmail_client_id",
GMAIL_OAUTH_CLIENT_SECRET="test_gmail_client_secret",
OUTLOOK_OAUTH_CLIENT_ID="test_outlook_client_id",
OUTLOOK_OAUTH_CLIENT_SECRET="test_outlook_client_secret",
)
class TestMailOAuth(
TestCase,
):
def setUp(self) -> None:
self.user = User.objects.create_user("testuser")
self.user.user_permissions.add(
*Permission.objects.filter(
codename__in=[
"add_mailaccount",
],
),
pytest.param(
{
"OAUTH_CALLBACK_BASE_URL": None,
"PAPERLESS_URL": "http://paperless.example.com",
},
)
self.user.save()
self.client.force_login(self.user)
self.mail_account_handler = MailAccountHandler()
super().setUp()
def test_generate_paths(self) -> None:
"""
GIVEN:
- Mocked settings for OAuth callback and base URLs
WHEN:
- get_oauth_callback_url and get_oauth_redirect_url are called
THEN:
- Correct URLs are generated
"""
# Callback URL
oauth_manager = PaperlessMailOAuth2Manager()
with override_settings(OAUTH_CALLBACK_BASE_URL="http://paperless.example.com"):
self.assertEqual(
oauth_manager.oauth_callback_url,
"http://paperless.example.com/api/oauth/callback/",
id="falls-back-to-paperless-url",
),
pytest.param(
{
"OAUTH_CALLBACK_BASE_URL": None,
"PAPERLESS_URL": "http://paperless.example.com",
"BASE_URL": "/paperless/",
},
)
with override_settings(
OAUTH_CALLBACK_BASE_URL=None,
PAPERLESS_URL="http://paperless.example.com",
):
self.assertEqual(
oauth_manager.oauth_callback_url,
"http://paperless.example.com/api/oauth/callback/",
)
with override_settings(
OAUTH_CALLBACK_BASE_URL=None,
PAPERLESS_URL="http://paperless.example.com",
BASE_URL="/paperless/",
):
self.assertEqual(
oauth_manager.oauth_callback_url,
"http://paperless.example.com/paperless/api/oauth/callback/",
id="respects-base-url-prefix",
),
],
)
def test_oauth_callback_url(
self,
settings: SettingsWrapper,
oauth_manager: PaperlessMailOAuth2Manager,
overrides: dict,
expected: str,
) -> None:
"""
GIVEN:
- Various combinations of OAUTH_CALLBACK_BASE_URL, PAPERLESS_URL, and BASE_URL
WHEN:
- oauth_callback_url is read from the manager
THEN:
- The expected fully-qualified callback URL is produced
"""
for key, value in overrides.items():
setattr(settings, key, value)
assert oauth_manager.oauth_callback_url == expected
)
@pytest.mark.parametrize(
("debug", "expected"),
[
pytest.param(
True,
# Redirect URL
with override_settings(DEBUG=True):
self.assertEqual(
oauth_manager.oauth_redirect_url,
"http://localhost:4200/mail",
id="debug-redirects-to-ng-dev",
),
pytest.param(False, "/mail", id="prod-redirects-to-relative-path"),
],
)
with override_settings(DEBUG=False):
self.assertEqual(
oauth_manager.oauth_redirect_url,
"/mail",
)
@mock.patch(
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_gmail_access_token",
)
def test_oauth_redirect_url(
self,
settings: SettingsWrapper,
oauth_manager: PaperlessMailOAuth2Manager,
debug: bool, # noqa: FBT001
expected: str,
) -> None:
"""
GIVEN:
- DEBUG is toggled on or off
WHEN:
- oauth_redirect_url is read from the manager
THEN:
- In DEBUG mode the Angular dev server URL is returned, otherwise a relative path
"""
settings.DEBUG = debug
assert oauth_manager.oauth_redirect_url == expected
@pytest.mark.django_db
class TestOAuthCallbackView:
"""End-to-end behavior of the /api/oauth/callback/ endpoint."""
def test_no_code(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured
WHEN:
- The OAuth callback is called without a code parameter
THEN:
- 400 Bad Request is returned and no mail account is created
"""
response = client.get("/api/oauth/callback/")
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.exists()
def test_invalid_state(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured
WHEN:
- The OAuth callback is called with a state that does not match the session
THEN:
- 400 Bad Request is returned and no mail account is created
"""
response = client.get(
"/api/oauth/callback/?code=test_code&state=invalid_state",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.exists()
def test_insufficient_permissions(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured
- User without add_mailaccount permission
WHEN:
- The OAuth callback is called
THEN:
- 400 Bad Request is returned and no mail account is created
"""
mail_user.user_permissions.remove(
*Permission.objects.filter(codename="add_mailaccount"),
)
response = client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert not MailAccount.objects.exists()
@pytest.mark.parametrize(
("provider", "callback_query", "expected_imap"),
[
pytest.param(
"gmail",
"code=test_code&scope=https://mail.google.com/&state=test_state",
"imap.gmail.com",
id="gmail",
),
pytest.param(
"outlook",
"code=test_code&state=test_state",
"outlook.office365.com",
id="outlook",
),
],
@mock.patch(
"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_outlook_access_token",
)
def test_success(
def test_oauth_callback_view_success(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
oauth_session: Client,
mocker: pytest_mock.MockerFixture,
provider: str,
callback_query: str,
expected_imap: str,
mock_get_outlook_access_token,
mock_get_gmail_access_token,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured for Gmail and Outlook
- A valid oauth_state seeded in the session
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
WHEN:
- The OAuth callback is called with a code and provider-specific scope
- OAuth callback is called with a code and scope
- OAuth callback is called with a code and no scope
THEN:
- A redirect with oauth_success=1 is returned
- The provider's access-token method is invoked
- A mail account for the matching provider is created
- Gmail mail account is created
- Outlook mail account is created
"""
token_payload = {
mock_get_gmail_access_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
mocked = mocker.patch(
f"paperless_mail.oauth.PaperlessMailOAuth2Manager.get_{provider}_access_token",
return_value=token_payload,
mock_get_outlook_access_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
session = self.client.session
session.update(
{
"oauth_state": "test_state",
},
)
session.save()
# Test Google OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=1", response.url)
mock_get_gmail_access_token.assert_called_once()
self.assertTrue(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
response = client.get(f"/api/oauth/callback/?{callback_query}")
# Test Outlook OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=1", response.url)
self.assertTrue(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
assert response.status_code == status.HTTP_302_FOUND
assert "oauth_success=1" in response.url
mocked.assert_called_once()
assert MailAccount.objects.filter(imap_server=expected_imap).exists()
@mock.patch("httpx_oauth.oauth2.BaseOAuth2.get_access_token")
def test_oauth_callback_view_fails(self, mock_get_access_token) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
WHEN:
- OAuth callback is called and get access token returns an error
THEN:
- No mail account is created
- Error is logged
"""
mock_get_access_token.side_effect = GetAccessTokenError("test_error")
@pytest.mark.parametrize(
("callback_query", "imap_server"),
[
pytest.param(
"code=test_code&scope=https://mail.google.com/&state=test_state",
"imap.gmail.com",
id="gmail",
session = self.client.session
session.update(
{
"oauth_state": "test_state",
},
)
session.save()
with self.assertLogs("paperless_mail", level="ERROR") as cm:
# Test Google OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=0", response.url)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
# Test Outlook OAuth callback
response = self.client.get(
"/api/oauth/callback/?code=test_code&state=test_state",
)
self.assertEqual(response.status_code, status.HTTP_302_FOUND)
self.assertIn("oauth_success=0", response.url)
self.assertFalse(
MailAccount.objects.filter(
imap_server="outlook.office365.com",
).exists(),
)
self.assertIn(
"Error getting access token from OAuth provider",
cm.output[0],
)
def test_oauth_callback_view_insufficient_permissions(self) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
- User without add_mailaccount permission
WHEN:
- OAuth callback is called
THEN:
- 400 bad request returned, no mail accounts are created
"""
self.user.user_permissions.remove(
*Permission.objects.filter(
codename__in=[
"add_mailaccount",
],
),
pytest.param(
"code=test_code&state=test_state",
"outlook.office365.com",
id="outlook",
),
],
)
self.user.save()
response = self.client.get(
"/api/oauth/callback/?code=test_code&scope=https://mail.google.com/",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
def test_oauth_callback_view_no_code(self) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
WHEN:
- OAuth callback is called without a code
THEN:
- 400 bad request returned, no mail accounts are created
"""
response = self.client.get(
"/api/oauth/callback/",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
def test_oauth_callback_view_invalid_state(self) -> None:
"""
GIVEN:
- Mocked settings for Gmail and Outlook OAuth client IDs and secrets
WHEN:
- OAuth callback is called with an invalid state
THEN:
- 400 bad request returned, no mail accounts are created
"""
response = self.client.get(
"/api/oauth/callback/?code=test_code&state=invalid_state",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertFalse(
MailAccount.objects.filter(imap_server="imap.gmail.com").exists(),
)
self.assertFalse(
MailAccount.objects.filter(imap_server="outlook.office365.com").exists(),
)
@mock.patch("paperless_mail.mail.get_mailbox")
@mock.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
)
def test_provider_error(
def test_refresh_token_on_handle_mail_account(
self,
client: Client,
mail_user: User,
oauth_settings: SettingsWrapper,
oauth_session: Client,
mocker: pytest_mock.MockerFixture,
caplog: pytest.LogCaptureFixture,
callback_query: str,
imap_server: str,
mock_refresh_token,
mock_get_mailbox,
) -> None:
"""
GIVEN:
- OAuth client IDs and secrets configured
- The provider's access-token endpoint raises GetAccessTokenError
- Mail account with refresh token and expiration
WHEN:
- The OAuth callback is called with a code (Gmail or Outlook)
- handle_mail_account is called
THEN:
- A redirect with oauth_success=0 is returned
- No mail account is created
- The failure is logged at ERROR level
- Refresh token is called
"""
mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.get_access_token",
side_effect=GetAccessTokenError("test_error"),
)
with caplog.at_level("ERROR", logger="paperless_mail"):
response = client.get(f"/api/oauth/callback/?{callback_query}")
mock_mailbox = mock.MagicMock()
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
assert response.status_code == status.HTTP_302_FOUND
assert "oauth_success=0" in response.url
assert not MailAccount.objects.filter(imap_server=imap_server).exists()
assert any(
"Error getting access token from OAuth provider" in record.message
for record in caplog.records
)
@pytest.fixture
def expired_oauth_account_factory(db: None, mocker: pytest_mock.MockerFixture):
"""
Build an OAuth-backed MailAccount whose access token has already expired,
while patching `get_mailbox` so no real IMAP connection is attempted.
"""
mocker.patch(
"paperless_mail.mail.get_mailbox",
).return_value.__enter__.return_value = mocker.MagicMock()
def _make(account_type: MailAccount.MailAccountType) -> MailAccount:
return MailAccountFactory(
mail_account = MailAccountFactory(
name="Test Gmail Mail Account",
username="test_username",
account_type=account_type,
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
is_token=True,
refresh_token="test_refresh_token",
expiration=timezone.now() - timedelta(days=1),
)
return _make
mock_refresh_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
}
self.mail_account_handler.handle_mail_account(mail_account)
mock_refresh_token.assert_called_once()
mock_refresh_token.reset_mock()
@pytest.mark.django_db
class TestRefreshTokenOnHandleMailAccount:
"""OAuth refresh-token flow exercised through MailAccountHandler.handle_mail_account."""
mock_refresh_token.return_value = {
"access_token": "test_access_token",
"refresh_token": "test_refresh",
"expires_in": 3600,
}
outlook_mail_account = MailAccountFactory(
name="Test Outlook Mail Account",
username="test_username",
account_type=MailAccount.MailAccountType.OUTLOOK_OAUTH,
is_token=True,
refresh_token="test_refresh_token",
expiration=timezone.now() - timedelta(days=1),
)
@pytest.mark.parametrize(
"account_type",
[
pytest.param(MailAccount.MailAccountType.GMAIL_OAUTH, id="gmail"),
pytest.param(MailAccount.MailAccountType.OUTLOOK_OAUTH, id="outlook"),
],
self.mail_account_handler.handle_mail_account(outlook_mail_account)
mock_refresh_token.assert_called_once()
@mock.patch("paperless_mail.mail.get_mailbox")
@mock.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
)
def test_refresh_token_called(
def test_refresh_token_on_handle_mail_account_fails(
self,
mocker: pytest_mock.MockerFixture,
mail_account_handler: MailAccountHandler,
expired_oauth_account_factory,
account_type: MailAccount.MailAccountType,
mock_refresh_token,
mock_get_mailbox,
) -> None:
"""
GIVEN:
- An OAuth-backed mail account with a refresh token and an expired access token
WHEN:
- handle_mail_account is called
THEN:
- The OAuth refresh_token endpoint is invoked exactly once
"""
mock_refresh = mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
return_value={
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
"expires_in": 3600,
},
)
mail_account_handler.handle_mail_account(
expired_oauth_account_factory(account_type),
)
mock_refresh.assert_called_once()
def test_refresh_token_failure(
self,
mocker: pytest_mock.MockerFixture,
caplog: pytest.LogCaptureFixture,
mail_account_handler: MailAccountHandler,
expired_oauth_account_factory,
) -> None:
"""
GIVEN:
- An OAuth-backed mail account with a refresh token and an expired access token
- The OAuth refresh_token endpoint raises RefreshTokenError
- Mail account with refresh token and expiration
WHEN:
- handle_mail_account is called
- Refresh token is called but fails
THEN:
- Error is logged
- 0 processed mails is returned
- The failure is logged at ERROR level with the account context
"""
mock_refresh = mocker.patch(
"httpx_oauth.oauth2.BaseOAuth2.refresh_token",
side_effect=RefreshTokenError("test_error"),
mock_mailbox = mock.MagicMock()
mock_get_mailbox.return_value.__enter__.return_value = mock_mailbox
mail_account = MailAccountFactory(
name="Test Gmail Mail Account",
username="test_username",
account_type=MailAccount.MailAccountType.GMAIL_OAUTH,
is_token=True,
refresh_token="test_refresh_token",
expiration=timezone.now() - timedelta(days=1),
)
account = expired_oauth_account_factory(
MailAccount.MailAccountType.GMAIL_OAUTH,
)
mock_refresh_token.side_effect = RefreshTokenError("test_error")
with caplog.at_level("ERROR", logger="paperless_mail"):
result = mail_account_handler.handle_mail_account(account)
assert result == 0
mock_refresh.assert_called_once()
assert any(
f"Failed to refresh oauth token for account {account}: test_error"
in record.message
for record in caplog.records
)
with self.assertLogs("paperless_mail", level="ERROR") as cm:
# returns 0 processed mails
self.assertEqual(
self.mail_account_handler.handle_mail_account(mail_account),
0,
)
mock_refresh_token.assert_called_once()
self.assertIn(
f"Failed to refresh oauth token for account {mail_account}: test_error",
cm.output[0],
)
+174 -209
View File
@@ -1,68 +1,64 @@
import email
import email.contentmanager
import shutil
import subprocess
from collections.abc import Generator
import tempfile
from email.message import Message
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from pathlib import Path
from unittest import mock
import gnupg
import pytest
from django.test import override_settings
from imap_tools import MailMessage
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture
from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailRule
from paperless_mail.preprocessor import MailMessageDecryptor
from paperless_mail.tests.factories import MailAccountFactory
from paperless_mail.tests.test_mail import MailMocker
from paperless_mail.tests.test_mail import TestMail
from paperless_mail.tests.test_mail import _AttachmentDef
def _kill_gpg_agent(gpg_home: str) -> None:
"""
Terminate any gpg-agent attached to `gpg_home` so the directory is
removable. Uses `gpgconf --kill`, the GnuPG project's recommended cleanup
path; python-gnupg has no built-in cleanup since it only wraps the CLI.
"""
try:
subprocess.run(
["gpgconf", "--kill", "gpg-agent"],
env={"GNUPGHOME": gpg_home},
check=False,
capture_output=True,
timeout=5,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
class MessageEncryptor:
"""
Test helper: generates a throwaway GPG keypair in a tempdir and exposes
`encrypt(MailMessage) -> MailMessage`.
"""
TEST_USER = "testuser@example.com"
def __init__(self, gpg_home: Path) -> None:
self.gpg_home = str(gpg_home)
def __init__(self) -> None:
self.gpg_home = tempfile.mkdtemp()
self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
self.gpg.gen_key(
self.gpg.gen_key_input(
name_email=self.TEST_USER,
passphrase=None,
key_type="RSA",
key_length=2048,
expire_date=0,
no_protection=True,
),
self._testUser = "testuser@example.com"
# Generate a new key
input_data = self.gpg.gen_key_input(
name_email=self._testUser,
passphrase=None,
key_type="RSA",
key_length=2048,
expire_date=0,
no_protection=True,
)
self.gpg.gen_key(input_data)
def kill_agent(self) -> None:
_kill_gpg_agent(self.gpg_home)
def cleanup(self) -> None:
"""
Kill the gpg-agent process and clean up the temporary GPG home directory.
This uses gpgconf to properly terminate the agent, which is the officially
recommended cleanup method from the GnuPG project. python-gnupg does not
provide built-in cleanup methods as it's only a wrapper around the gpg CLI.
"""
# Kill the gpg-agent using the official GnuPG cleanup tool
try:
subprocess.run(
["gpgconf", "--kill", "gpg-agent"],
env={"GNUPGHOME": self.gpg_home},
check=False,
capture_output=True,
timeout=5,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
# gpgconf not found or hung - agent will timeout eventually
pass
# Clean up the temporary directory
shutil.rmtree(self.gpg_home, ignore_errors=True)
@staticmethod
def get_email_body_without_headers(email_message: Message) -> bytes:
@@ -70,6 +66,7 @@ class MessageEncryptor:
Filters some relevant headers from an EmailMessage and returns just the body.
"""
message_copy = email.message_from_bytes(email_message.as_bytes())
message_copy._headers = [
header
for header in message_copy._headers
@@ -77,235 +74,203 @@ class MessageEncryptor:
]
return message_copy.as_bytes()
def encrypt(self, message: MailMessage) -> MailMessage:
original_email: Message = message.obj
def encrypt(self, message):
original_email: email.message.Message = message.obj
encrypted_data = self.gpg.encrypt(
self.get_email_body_without_headers(original_email),
self.TEST_USER,
self._testUser,
armor=True,
)
if not encrypted_data.ok:
raise Exception(f"Encryption failed: {encrypted_data.stderr}")
encrypted_email_content = encrypted_data.data
new_email = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")
new_email["From"] = original_email["From"]
new_email["To"] = original_email["To"]
new_email["Subject"] = original_email["Subject"]
# Add the control part
control_part = MIMEApplication(_data=b"", _subtype="pgp-encrypted")
control_part.set_payload("Version: 1")
new_email.attach(control_part)
# Add the encrypted data part
encrypted_part = MIMEApplication(_data=b"", _subtype="octet-stream")
encrypted_part.set_payload(encrypted_data.data.decode("ascii"))
encrypted_part.set_payload(encrypted_email_content.decode("ascii"))
encrypted_part.add_header(
"Content-Disposition",
'attachment; filename="encrypted.asc"',
)
new_email.attach(encrypted_part)
return MailMessage(
encrypted_message: MailMessage = MailMessage(
[(f"UID {message.uid}".encode(), new_email.as_bytes())],
)
return encrypted_message
@pytest.fixture(scope="session")
def message_encryptor(
tmp_path_factory: pytest.TempPathFactory,
) -> Generator[MessageEncryptor, None, None]:
"""
Session-scoped: GPG keypair generation is slow (~1s+), and nothing in
these tests mutates the keyring after creation. The GPG home directory
comes from `tmp_path_factory` so pytest cleans it up at session end;
we still kill the gpg-agent ourselves so the dir is removable.
"""
encryptor = MessageEncryptor(tmp_path_factory.mktemp("gpg-home"))
yield encryptor
encryptor.kill_agent()
class TestMailMessageGpgDecryptor(TestMail):
@classmethod
def setUpClass(cls) -> None:
"""Create GPG encryptor once for all tests in this class."""
super().setUpClass()
cls.messageEncryptor = MessageEncryptor()
@classmethod
def tearDownClass(cls) -> None:
"""Clean up GPG resources after all tests complete."""
if hasattr(cls, "messageEncryptor"):
cls.messageEncryptor.cleanup()
super().tearDownClass()
@pytest.fixture
def gpg_settings(
settings: SettingsWrapper,
message_encryptor: MessageEncryptor,
) -> SettingsWrapper:
settings.EMAIL_GNUPG_HOME = message_encryptor.gpg_home
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
return settings
def setUp(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
super().setUp()
def test_preprocessor_is_able_to_run(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
@pytest.fixture
def encrypted_pair(
mail_mocker: MailMocker,
message_encryptor: MessageEncryptor,
) -> tuple[MailMessage, MailMessage]:
"""
Build a (encrypted, plaintext) MailMessage pair sharing the same UID and
headers, with two PDF attachments on the plaintext side.
"""
plaintext = mail_mocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(filename="f1.pdf", disposition="inline"),
_AttachmentDef(filename="f2.pdf"),
],
)
return message_encryptor.encrypt(plaintext), plaintext
def test_preprocessor_is_able_to_run2(self) -> None:
with override_settings(
EMAIL_GNUPG_HOME=None,
EMAIL_ENABLE_GPG_DECRYPTOR=True,
):
self.assertTrue(MailMessageDecryptor.able_to_run())
def test_is_not_able_to_run_disabled(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=False,
):
self.assertFalse(MailMessageDecryptor.able_to_run())
# Sentinel used in `test_able_to_run` parametrization to request the real
# GPG home from the session-scoped `message_encryptor` fixture at runtime.
_VALID_GPG_HOME = object()
def test_is_not_able_to_run_bogus_path(self) -> None:
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME="_)@# notapath &%#$",
):
self.assertFalse(MailMessageDecryptor.able_to_run())
class TestMailMessageDecryptorAbleToRun:
"""`MailMessageDecryptor.able_to_run()` configuration matrix."""
@pytest.mark.parametrize(
("settings_overrides", "expected"),
[
pytest.param(
{
"EMAIL_GNUPG_HOME": _VALID_GPG_HOME,
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
},
True,
id="enabled-with-valid-home",
def test_fails_at_initialization(self) -> None:
with (
mock.patch("gnupg.GPG.__init__") as mock_run,
override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
),
pytest.param(
{"EMAIL_GNUPG_HOME": None, "EMAIL_ENABLE_GPG_DECRYPTOR": True},
True,
id="enabled-with-default-home",
),
pytest.param(
{"EMAIL_ENABLE_GPG_DECRYPTOR": False},
False,
id="disabled",
),
pytest.param(
{
"EMAIL_ENABLE_GPG_DECRYPTOR": True,
"EMAIL_GNUPG_HOME": "_)@# notapath &%#$",
},
False,
id="enabled-with-bogus-path",
),
],
)
def test_able_to_run(
self,
settings: SettingsWrapper,
message_encryptor: MessageEncryptor,
settings_overrides: dict,
*,
expected: bool,
) -> None:
for key, value in settings_overrides.items():
if value is _VALID_GPG_HOME:
value = message_encryptor.gpg_home
setattr(settings, key, value)
assert MailMessageDecryptor.able_to_run() is expected
):
def side_effect(*args, **kwargs):
raise OSError("Cannot find 'gpg' binary")
@pytest.mark.django_db
class TestMailMessageDecryptor:
"""End-to-end decrypt and consumption flow with a real GPG keyring."""
mock_run.side_effect = side_effect
def test_fails_at_initialization(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
mocker.patch(
"gnupg.GPG.__init__",
side_effect=OSError("Cannot find 'gpg' binary"),
)
handler = MailAccountHandler()
assert len(handler._message_preprocessors) == 0
def test_decrypt_fails(
self,
settings: SettingsWrapper,
encrypted_pair: tuple[MailMessage, MailMessage],
tmp_path: Path,
) -> None:
"""
A decryptor pointed at a fresh empty GPG home cannot decrypt the
message — ensure it surfaces an exception rather than silently passing
bytes through.
"""
encrypted_message, _ = encrypted_pair
empty_gpg_home = tmp_path / "empty-gpg-home"
empty_gpg_home.mkdir()
settings.EMAIL_ENABLE_GPG_DECRYPTOR = True
settings.EMAIL_GNUPG_HOME = str(empty_gpg_home)
handler = MailAccountHandler()
self.assertEqual(len(handler._message_preprocessors), 0)
def test_decrypt_fails(self) -> None:
encrypted_message, _ = self.create_encrypted_unencrypted_message_pair()
# This test creates its own empty GPG home to test decryption failure
empty_gpg_home = tempfile.mkdtemp()
try:
with pytest.raises(Exception):
MailMessageDecryptor().run(encrypted_message)
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME=empty_gpg_home,
):
message_decryptor = MailMessageDecryptor()
self.assertRaises(Exception, message_decryptor.run, encrypted_message)
finally:
_kill_gpg_agent(str(empty_gpg_home))
# Clean up the temporary GPG home used only by this test
try:
subprocess.run(
["gpgconf", "--kill", "gpg-agent"],
env={"GNUPGHOME": empty_gpg_home},
check=False,
capture_output=True,
timeout=5,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
shutil.rmtree(empty_gpg_home, ignore_errors=True)
def test_decrypt_encrypted_mail(
self,
gpg_settings: SettingsWrapper,
encrypted_pair: tuple[MailMessage, MailMessage],
) -> None:
def test_decrypt_encrypted_mail(self) -> None:
"""
Creates a mail with attachments. Then encrypts it with a new key.
Verifies that this encrypted message can be decrypted with attachments intact.
"""
encrypted_message, plaintext = encrypted_pair
encrypted_message, message = self.create_encrypted_unencrypted_message_pair()
headers = message.headers
text = message.text
assert len(encrypted_message.attachments) == 1
assert encrypted_message.attachments[0].filename == "encrypted.asc"
assert encrypted_message.text == ""
self.assertEqual(len(encrypted_message.attachments), 1)
self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc")
self.assertEqual(encrypted_message.text, "")
decryptor = MailMessageDecryptor()
assert decryptor.able_to_run()
decrypted = decryptor.run(encrypted_message)
with override_settings(
EMAIL_ENABLE_GPG_DECRYPTOR=True,
EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
):
message_decryptor = MailMessageDecryptor()
self.assertTrue(message_decryptor.able_to_run())
decrypted_message = message_decryptor.run(encrypted_message)
assert len(decrypted.attachments) == 2
assert decrypted.attachments[0].filename == "f1.pdf"
assert decrypted.attachments[1].filename == "f2.pdf"
assert decrypted.headers == plaintext.headers
assert decrypted.text == plaintext.text
assert decrypted.uid == plaintext.uid
self.assertEqual(len(decrypted_message.attachments), 2)
self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf")
self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf")
self.assertEqual(decrypted_message.headers, headers)
self.assertEqual(decrypted_message.text, text)
self.assertEqual(decrypted_message.uid, message.uid)
def test_handle_encrypted_message(
self,
gpg_settings: SettingsWrapper,
mail_mocker: MailMocker,
message_encryptor: MessageEncryptor,
) -> None:
plaintext = mail_mocker.messageBuilder.create_message(
def create_encrypted_unencrypted_message_pair(self):
message = self.mailMocker.messageBuilder.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(
filename="f1.pdf",
disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
encrypted_message = self.messageEncryptor.encrypt(message)
return encrypted_message, message
def test_handle_encrypted_message(self) -> None:
message = self.mailMocker.messageBuilder.create_message(
subject="the message title",
from_="Myself",
attachments=2,
body="Test mail",
)
encrypted = message_encryptor.encrypt(plaintext)
rule = MailRule.objects.create(
encrypted_message = self.messageEncryptor.encrypt(message)
account = MailAccountFactory()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
consumption_scope=MailRule.ConsumptionScope.EVERYTHING,
account=MailAccountFactory(),
account=account,
)
rule.save()
result = MailAccountHandler()._handle_message(encrypted, rule)
result = self.mail_account_handler._handle_message(encrypted_message, rule)
assert result == 3
mail_mocker._queue_consumption_tasks_mock.assert_called()
mail_mocker.assert_queue_consumption_tasks_call_args(
self.assertEqual(result, 3)
self.mailMocker._queue_consumption_tasks_mock.assert_called()
self.mailMocker.assert_queue_consumption_tasks_call_args(
[
[
{
"override_title": plaintext.subject,
"override_filename": f"{plaintext.subject}.eml",
"override_title": message.subject,
"override_filename": f"{message.subject}.eml",
},
],
[