mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-06-21 12:54:22 +00:00
Merge branch 'main' into dev
# Conflicts: # docs/setup.md # src-ui/src/main.ts # src/documents/tests/test_api_bulk_edit.py # src/documents/tests/test_api_custom_fields.py # src/documents/tests/test_api_search.py # src/documents/tests/test_api_status.py # src/documents/tests/test_workflows.py # src/paperless_mail/tests/test_api.py
This commit is contained in:
@@ -1,5 +1,32 @@
|
||||
# Changelog
|
||||
|
||||
## paperless-ngx 2.20.12
|
||||
|
||||
### Security
|
||||
|
||||
- Resolve [GHSA-96jx-fj7m-qh6x](https://github.com/paperless-ngx/paperless-ngx/security/advisories/GHSA-96jx-fj7m-qh6x)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Fix: Scope the workflow saves to prevent clobbering filename/archive_filename [@stumpylog](https://github.com/stumpylog) ([#12390](https://github.com/paperless-ngx/paperless-ngx/pull/12390))
|
||||
- Fix: don't try to usermod/groupmod when non-root + update docs (#<!---->12365) [@stumpylog](https://github.com/stumpylog) ([#12391](https://github.com/paperless-ngx/paperless-ngx/pull/12391))
|
||||
- Fix: avoid moving files if already moved [@shamoon](https://github.com/shamoon) ([#12389](https://github.com/paperless-ngx/paperless-ngx/pull/12389))
|
||||
- Fix: remove pagination from document notes api spec [@shamoon](https://github.com/shamoon) ([#12388](https://github.com/paperless-ngx/paperless-ngx/pull/12388))
|
||||
- Fix: fix file button hover color in dark mode [@shamoon](https://github.com/shamoon) ([#12367](https://github.com/paperless-ngx/paperless-ngx/pull/12367))
|
||||
- Fixhancement: only offer basic auth for appropriate requests [@shamoon](https://github.com/shamoon) ([#12362](https://github.com/paperless-ngx/paperless-ngx/pull/12362))
|
||||
|
||||
### All App Changes
|
||||
|
||||
<details>
|
||||
<summary>5 changes</summary>
|
||||
|
||||
- Fix: Scope the workflow saves to prevent clobbering filename/archive_filename [@stumpylog](https://github.com/stumpylog) ([#12390](https://github.com/paperless-ngx/paperless-ngx/pull/12390))
|
||||
- Fix: avoid moving files if already moved [@shamoon](https://github.com/shamoon) ([#12389](https://github.com/paperless-ngx/paperless-ngx/pull/12389))
|
||||
- Fix: remove pagination from document notes api spec [@shamoon](https://github.com/shamoon) ([#12388](https://github.com/paperless-ngx/paperless-ngx/pull/12388))
|
||||
- Fix: fix file button hover color in dark mode [@shamoon](https://github.com/shamoon) ([#12367](https://github.com/paperless-ngx/paperless-ngx/pull/12367))
|
||||
- Fixhancement: only offer basic auth for appropriate requests [@shamoon](https://github.com/shamoon) ([#12362](https://github.com/paperless-ngx/paperless-ngx/pull/12362))
|
||||
</details>
|
||||
|
||||
## paperless-ngx 2.20.11
|
||||
|
||||
### Security
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "paperless-ngx"
|
||||
version = "2.20.11"
|
||||
version = "2.20.13"
|
||||
description = "A community-supported supercharged document management system: scan, index and archive all your physical documents"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paperless-ngx-ui",
|
||||
"version": "2.20.11",
|
||||
"version": "2.20.13",
|
||||
"scripts": {
|
||||
"preinstall": "npx only-allow pnpm",
|
||||
"ng": "ng",
|
||||
|
||||
@@ -0,0 +1,122 @@
|
||||
import {
|
||||
HttpErrorResponse,
|
||||
HttpHandlerFn,
|
||||
HttpRequest,
|
||||
} from '@angular/common/http'
|
||||
import { throwError } from 'rxjs'
|
||||
import * as navUtils from '../utils/navigation'
|
||||
import { createAuthExpiryInterceptor } from './auth-expiry.interceptor'
|
||||
|
||||
describe('withAuthExpiryInterceptor', () => {
|
||||
let interceptor: ReturnType<typeof createAuthExpiryInterceptor>
|
||||
let dateNowSpy: jest.SpiedFunction<typeof Date.now>
|
||||
|
||||
beforeEach(() => {
|
||||
interceptor = createAuthExpiryInterceptor()
|
||||
dateNowSpy = jest.spyOn(Date, 'now').mockReturnValue(1000)
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
jest.restoreAllMocks()
|
||||
})
|
||||
|
||||
it('reloads when an API request returns 401', () => {
|
||||
const reloadSpy = jest
|
||||
.spyOn(navUtils, 'locationReload')
|
||||
.mockImplementation(() => {})
|
||||
|
||||
interceptor(
|
||||
new HttpRequest('GET', '/api/documents/'),
|
||||
failingHandler('/api/documents/', 401)
|
||||
).subscribe({
|
||||
error: () => undefined,
|
||||
})
|
||||
|
||||
expect(reloadSpy).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('does not reload for non-401 errors', () => {
|
||||
const reloadSpy = jest
|
||||
.spyOn(navUtils, 'locationReload')
|
||||
.mockImplementation(() => {})
|
||||
|
||||
interceptor(
|
||||
new HttpRequest('GET', '/api/documents/'),
|
||||
failingHandler('/api/documents/', 500)
|
||||
).subscribe({
|
||||
error: () => undefined,
|
||||
})
|
||||
|
||||
expect(reloadSpy).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('does not reload for non-api 401 responses', () => {
|
||||
const reloadSpy = jest
|
||||
.spyOn(navUtils, 'locationReload')
|
||||
.mockImplementation(() => {})
|
||||
|
||||
interceptor(
|
||||
new HttpRequest('GET', '/accounts/profile/'),
|
||||
failingHandler('/accounts/profile/', 401)
|
||||
).subscribe({
|
||||
error: () => undefined,
|
||||
})
|
||||
|
||||
expect(reloadSpy).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('reloads only once even with multiple API 401 responses', () => {
|
||||
const reloadSpy = jest
|
||||
.spyOn(navUtils, 'locationReload')
|
||||
.mockImplementation(() => {})
|
||||
|
||||
const request = new HttpRequest('GET', '/api/documents/')
|
||||
const handler = failingHandler('/api/documents/', 401)
|
||||
|
||||
interceptor(request, handler).subscribe({
|
||||
error: () => undefined,
|
||||
})
|
||||
interceptor(request, handler).subscribe({
|
||||
error: () => undefined,
|
||||
})
|
||||
|
||||
expect(reloadSpy).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('retries reload after cooldown for repeated API 401 responses', () => {
|
||||
const reloadSpy = jest
|
||||
.spyOn(navUtils, 'locationReload')
|
||||
.mockImplementation(() => {})
|
||||
|
||||
dateNowSpy
|
||||
.mockReturnValueOnce(1000)
|
||||
.mockReturnValueOnce(2500)
|
||||
.mockReturnValueOnce(3501)
|
||||
|
||||
const request = new HttpRequest('GET', '/api/documents/')
|
||||
const handler = failingHandler('/api/documents/', 401)
|
||||
|
||||
interceptor(request, handler).subscribe({
|
||||
error: () => undefined,
|
||||
})
|
||||
interceptor(request, handler).subscribe({
|
||||
error: () => undefined,
|
||||
})
|
||||
interceptor(request, handler).subscribe({
|
||||
error: () => undefined,
|
||||
})
|
||||
|
||||
expect(reloadSpy).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
})
|
||||
|
||||
function failingHandler(url: string, status: number): HttpHandlerFn {
|
||||
return (_request) =>
|
||||
throwError(
|
||||
() =>
|
||||
new HttpErrorResponse({
|
||||
status,
|
||||
url,
|
||||
})
|
||||
)
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
import {
|
||||
HttpErrorResponse,
|
||||
HttpEvent,
|
||||
HttpHandlerFn,
|
||||
HttpInterceptorFn,
|
||||
HttpRequest,
|
||||
} from '@angular/common/http'
|
||||
import { catchError, Observable, throwError } from 'rxjs'
|
||||
import { locationReload } from '../utils/navigation'
|
||||
|
||||
export const createAuthExpiryInterceptor = (): HttpInterceptorFn => {
|
||||
let lastReloadAttempt = Number.NEGATIVE_INFINITY
|
||||
|
||||
return (
|
||||
request: HttpRequest<unknown>,
|
||||
next: HttpHandlerFn
|
||||
): Observable<HttpEvent<unknown>> =>
|
||||
next(request).pipe(
|
||||
catchError((error: unknown) => {
|
||||
if (
|
||||
error instanceof HttpErrorResponse &&
|
||||
error.status === 401 &&
|
||||
request.url.includes('/api/')
|
||||
) {
|
||||
const now = Date.now()
|
||||
if (now - lastReloadAttempt >= 2000) {
|
||||
lastReloadAttempt = now
|
||||
locationReload()
|
||||
}
|
||||
}
|
||||
|
||||
return throwError(() => error)
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
export const withAuthExpiryInterceptor = createAuthExpiryInterceptor()
|
||||
@@ -6,7 +6,7 @@ export const environment = {
|
||||
apiVersion: '10', // match src/paperless/settings.py
|
||||
appTitle: 'Paperless-ngx',
|
||||
tag: 'prod',
|
||||
version: '2.20.11',
|
||||
version: '2.20.13',
|
||||
webSocketHost: window.location.host,
|
||||
webSocketProtocol: window.location.protocol == 'https:' ? 'wss:' : 'ws:',
|
||||
webSocketBaseUrl: base_url.pathname + 'ws/',
|
||||
|
||||
+6
-1
@@ -154,6 +154,7 @@ import { DirtyDocGuard } from './app/guards/dirty-doc.guard'
|
||||
import { DirtySavedViewGuard } from './app/guards/dirty-saved-view.guard'
|
||||
import { PermissionsGuard } from './app/guards/permissions.guard'
|
||||
import { withApiVersionInterceptor } from './app/interceptors/api-version.interceptor'
|
||||
import { withAuthExpiryInterceptor } from './app/interceptors/auth-expiry.interceptor'
|
||||
import { withCsrfInterceptor } from './app/interceptors/csrf.interceptor'
|
||||
import { DocumentTitlePipe } from './app/pipes/document-title.pipe'
|
||||
import { FilterPipe } from './app/pipes/filter.pipe'
|
||||
@@ -399,7 +400,11 @@ bootstrapApplication(AppComponent, {
|
||||
StoragePathNamePipe,
|
||||
provideHttpClient(
|
||||
withInterceptorsFromDi(),
|
||||
withInterceptors([withCsrfInterceptor, withApiVersionInterceptor]),
|
||||
withInterceptors([
|
||||
withCsrfInterceptor,
|
||||
withApiVersionInterceptor,
|
||||
withAuthExpiryInterceptor,
|
||||
]),
|
||||
withFetch()
|
||||
),
|
||||
provideUiTour({
|
||||
|
||||
@@ -154,6 +154,11 @@ $form-check-radio-checked-bg-image-dark: url("data:image/svg+xml,<svg xmlns='htt
|
||||
--bs-list-group-action-active-color: var(--bs-body-color);
|
||||
--bs-list-group-action-active-bg: var(--pngx-bg-darker);
|
||||
}
|
||||
|
||||
.form-control:hover::file-selector-button {
|
||||
background-color:var(--pngx-bg-dark) !important
|
||||
}
|
||||
|
||||
.search-container {
|
||||
input, input:focus, i-bs[name="search"] , ::placeholder {
|
||||
color: var(--pngx-primary-text-contrast) !important;
|
||||
|
||||
@@ -477,7 +477,14 @@ class DelayedFullTextQuery(DelayedQuery):
|
||||
try:
|
||||
corrected = self.searcher.correct_query(q, q_str)
|
||||
if corrected.string != q_str:
|
||||
suggested_correction = corrected.string
|
||||
corrected_results = self.searcher.search(
|
||||
corrected.query,
|
||||
limit=1,
|
||||
filter=MappedDocIdSet(self.filter_queryset, self.searcher.ixreader),
|
||||
scored=False,
|
||||
)
|
||||
if len(corrected_results) > 0:
|
||||
suggested_correction = corrected.string
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
"Error while correcting query %s: %s",
|
||||
|
||||
@@ -797,6 +797,25 @@ class ReadWriteSerializerMethodField(serializers.SerializerMethodField):
|
||||
return {self.field_name: data}
|
||||
|
||||
|
||||
def validate_documentlink_targets(user, doc_ids):
|
||||
if Document.objects.filter(id__in=doc_ids).count() != len(doc_ids):
|
||||
raise serializers.ValidationError(
|
||||
"Some documents in value don't exist or were specified twice.",
|
||||
)
|
||||
|
||||
if user is None:
|
||||
return
|
||||
|
||||
target_documents = Document.objects.filter(id__in=doc_ids).select_related("owner")
|
||||
if not all(
|
||||
has_perms_owner_aware(user, "change_document", document)
|
||||
for document in target_documents
|
||||
):
|
||||
raise PermissionDenied(
|
||||
_("Insufficient permissions."),
|
||||
)
|
||||
|
||||
|
||||
class CustomFieldInstanceSerializer(serializers.ModelSerializer):
|
||||
field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all())
|
||||
value = ReadWriteSerializerMethodField(allow_null=True)
|
||||
@@ -887,12 +906,11 @@ class CustomFieldInstanceSerializer(serializers.ModelSerializer):
|
||||
"Value must be a list",
|
||||
)
|
||||
doc_ids = data["value"]
|
||||
if Document.objects.filter(id__in=doc_ids).count() != len(
|
||||
data["value"],
|
||||
):
|
||||
raise serializers.ValidationError(
|
||||
"Some documents in value don't exist or were specified twice.",
|
||||
)
|
||||
request = self.context.get("request")
|
||||
validate_documentlink_targets(
|
||||
getattr(request, "user", None) if request is not None else None,
|
||||
doc_ids,
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
@@ -1713,6 +1731,19 @@ class BulkEditSerializer(
|
||||
f"Some custom fields in {name} don't exist or were specified twice.",
|
||||
)
|
||||
|
||||
if isinstance(custom_fields, dict):
|
||||
custom_field_map = CustomField.objects.in_bulk(ids)
|
||||
for raw_field_id, value in custom_fields.items():
|
||||
field = custom_field_map.get(int(raw_field_id))
|
||||
if (
|
||||
field is not None
|
||||
and field.data_type == CustomField.FieldDataType.DOCUMENTLINK
|
||||
and value is not None
|
||||
):
|
||||
if not isinstance(value, list):
|
||||
raise serializers.ValidationError("Value must be a list")
|
||||
validate_documentlink_targets(self.user, value)
|
||||
|
||||
def validate_method(self, method):
|
||||
if method == "set_correspondent":
|
||||
return bulk_edit.set_correspondent
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
@@ -403,6 +404,14 @@ class CannotMoveFilesException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _path_matches_checksum(path: Path, checksum: str | None) -> bool:
|
||||
if checksum is None or not path.is_file():
|
||||
return False
|
||||
|
||||
with path.open("rb") as f:
|
||||
return hashlib.md5(f.read()).hexdigest() == checksum
|
||||
|
||||
|
||||
def _filename_template_uses_custom_fields(doc: Document) -> bool:
|
||||
template = None
|
||||
if doc.storage_path is not None:
|
||||
@@ -473,10 +482,12 @@ def update_filename_and_move_files(
|
||||
old_filename = instance.filename
|
||||
old_source_path = instance.source_path
|
||||
move_original = False
|
||||
original_already_moved = False
|
||||
|
||||
old_archive_filename = instance.archive_filename
|
||||
old_archive_path = instance.archive_path
|
||||
move_archive = False
|
||||
archive_already_moved = False
|
||||
|
||||
candidate_filename = generate_filename(instance)
|
||||
if len(str(candidate_filename)) > Document.MAX_STORED_FILENAME_LENGTH:
|
||||
@@ -497,14 +508,23 @@ def update_filename_and_move_files(
|
||||
candidate_source_path.exists()
|
||||
and candidate_source_path != old_source_path
|
||||
):
|
||||
# Only fall back to unique search when there is an actual conflict
|
||||
new_filename = generate_unique_filename(instance)
|
||||
if not old_source_path.is_file() and _path_matches_checksum(
|
||||
candidate_source_path,
|
||||
instance.checksum,
|
||||
):
|
||||
new_filename = candidate_filename
|
||||
original_already_moved = True
|
||||
else:
|
||||
# Only fall back to unique search when there is an actual conflict
|
||||
new_filename = generate_unique_filename(instance)
|
||||
else:
|
||||
new_filename = candidate_filename
|
||||
|
||||
# Need to convert to string to be able to save it to the db
|
||||
instance.filename = str(new_filename)
|
||||
move_original = old_filename != instance.filename
|
||||
move_original = (
|
||||
old_filename != instance.filename and not original_already_moved
|
||||
)
|
||||
|
||||
if instance.has_archive_version:
|
||||
archive_candidate = generate_filename(instance, archive_filename=True)
|
||||
@@ -525,24 +545,38 @@ def update_filename_and_move_files(
|
||||
archive_candidate_path.exists()
|
||||
and archive_candidate_path != old_archive_path
|
||||
):
|
||||
new_archive_filename = generate_unique_filename(
|
||||
instance,
|
||||
archive_filename=True,
|
||||
)
|
||||
if not old_archive_path.is_file() and _path_matches_checksum(
|
||||
archive_candidate_path,
|
||||
instance.archive_checksum,
|
||||
):
|
||||
new_archive_filename = archive_candidate
|
||||
archive_already_moved = True
|
||||
else:
|
||||
new_archive_filename = generate_unique_filename(
|
||||
instance,
|
||||
archive_filename=True,
|
||||
)
|
||||
else:
|
||||
new_archive_filename = archive_candidate
|
||||
|
||||
instance.archive_filename = str(new_archive_filename)
|
||||
|
||||
move_archive = old_archive_filename != instance.archive_filename
|
||||
move_archive = (
|
||||
old_archive_filename != instance.archive_filename
|
||||
and not archive_already_moved
|
||||
)
|
||||
else:
|
||||
move_archive = False
|
||||
|
||||
if not move_original and not move_archive:
|
||||
# Just update modified. Also, don't save() here to prevent infinite recursion.
|
||||
Document.objects.filter(pk=instance.pk).update(
|
||||
modified=timezone.now(),
|
||||
)
|
||||
updates = {"modified": timezone.now()}
|
||||
if old_filename != instance.filename:
|
||||
updates["filename"] = instance.filename
|
||||
if old_archive_filename != instance.archive_filename:
|
||||
updates["archive_filename"] = instance.archive_filename
|
||||
|
||||
# Don't save() here to prevent infinite recursion.
|
||||
Document.objects.filter(pk=instance.pk).update(**updates)
|
||||
return
|
||||
|
||||
if move_original:
|
||||
@@ -932,10 +966,25 @@ def run_workflows(
|
||||
if not use_overrides:
|
||||
# limit title to 128 characters
|
||||
document.title = document.title[:128]
|
||||
# Make sure the filename and archive filename are accurate
|
||||
document.refresh_from_db(fields=["filename", "archive_filename"])
|
||||
# save first before setting tags
|
||||
document.save()
|
||||
# Save only the fields that workflow actions can set directly.
|
||||
# Deliberately excludes filename and archive_filename — those are
|
||||
# managed exclusively by update_filename_and_move_files via the
|
||||
# post_save signal. Writing stale in-memory values here would revert
|
||||
# a concurrent update_filename_and_move_files DB write, leaving the
|
||||
# DB pointing at the old path while the file is already at the new
|
||||
# one (see: https://github.com/paperless-ngx/paperless-ngx/issues/12386).
|
||||
# modified has auto_now=True but is not auto-added when update_fields
|
||||
# is specified, so it must be listed explicitly.
|
||||
document.save(
|
||||
update_fields=[
|
||||
"title",
|
||||
"correspondent",
|
||||
"document_type",
|
||||
"storage_path",
|
||||
"owner",
|
||||
"modified",
|
||||
],
|
||||
)
|
||||
document.tags.set(doc_tag_ids)
|
||||
|
||||
WorkflowRun.objects.create(
|
||||
|
||||
@@ -262,6 +262,50 @@ class TestBulkEditAPI(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(kwargs["add_custom_fields"], [self.cf1.id])
|
||||
self.assertEqual(kwargs["remove_custom_fields"], [self.cf2.id])
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.modify_custom_fields")
|
||||
def test_api_modify_custom_fields_documentlink_forbidden_for_unpermitted_target(
|
||||
self,
|
||||
m,
|
||||
) -> None:
|
||||
self.setup_mock(m, "modify_custom_fields")
|
||||
user = User.objects.create_user(username="doc-owner")
|
||||
user.user_permissions.add(Permission.objects.get(codename="change_document"))
|
||||
other_user = User.objects.create_user(username="other-user")
|
||||
source_doc = Document.objects.create(
|
||||
checksum="source",
|
||||
title="Source",
|
||||
owner=user,
|
||||
)
|
||||
target_doc = Document.objects.create(
|
||||
checksum="target",
|
||||
title="Target",
|
||||
owner=other_user,
|
||||
)
|
||||
doclink_field = CustomField.objects.create(
|
||||
name="doclink",
|
||||
data_type=CustomField.FieldDataType.DOCUMENTLINK,
|
||||
)
|
||||
|
||||
self.client.force_authenticate(user=user)
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [source_doc.id],
|
||||
"method": "modify_custom_fields",
|
||||
"parameters": {
|
||||
"add_custom_fields": {doclink_field.id: [target_doc.id]},
|
||||
"remove_custom_fields": [],
|
||||
},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
m.assert_not_called()
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.modify_custom_fields")
|
||||
def test_api_modify_custom_fields_with_values(self, m) -> None:
|
||||
self.setup_mock(m, "modify_custom_fields")
|
||||
|
||||
@@ -6,6 +6,7 @@ from unittest.mock import ANY
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import override_settings
|
||||
from guardian.shortcuts import assign_perm
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
@@ -1140,6 +1141,102 @@ class TestCustomFieldsAPI(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(doc5.custom_fields.first().value, [1])
|
||||
|
||||
def test_documentlink_patch_requires_change_permission_on_target_documents(
|
||||
self,
|
||||
) -> None:
|
||||
source_owner = User.objects.create_user(username="source-owner")
|
||||
source_owner.user_permissions.add(
|
||||
Permission.objects.get(codename="change_document"),
|
||||
)
|
||||
other_user = User.objects.create_user(username="other-user")
|
||||
|
||||
source_doc = Document.objects.create(
|
||||
title="Source",
|
||||
checksum="source",
|
||||
mime_type="application/pdf",
|
||||
owner=source_owner,
|
||||
)
|
||||
target_doc = Document.objects.create(
|
||||
title="Target",
|
||||
checksum="target",
|
||||
mime_type="application/pdf",
|
||||
owner=other_user,
|
||||
)
|
||||
custom_field_doclink = CustomField.objects.create(
|
||||
name="Test Custom Field Doc Link",
|
||||
data_type=CustomField.FieldDataType.DOCUMENTLINK,
|
||||
)
|
||||
|
||||
self.client.force_authenticate(user=source_owner)
|
||||
|
||||
resp = self.client.patch(
|
||||
f"/api/documents/{source_doc.id}/",
|
||||
data={
|
||||
"custom_fields": [
|
||||
{
|
||||
"field": custom_field_doclink.id,
|
||||
"value": [target_doc.id],
|
||||
},
|
||||
],
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
self.assertEqual(
|
||||
CustomFieldInstance.objects.filter(field=custom_field_doclink).count(),
|
||||
0,
|
||||
)
|
||||
|
||||
def test_documentlink_patch_allowed_with_change_permission_on_target_documents(
|
||||
self,
|
||||
) -> None:
|
||||
source_owner = User.objects.create_user(username="source-owner")
|
||||
source_owner.user_permissions.add(
|
||||
Permission.objects.get(codename="change_document"),
|
||||
)
|
||||
other_user = User.objects.create_user(username="other-user")
|
||||
|
||||
source_doc = Document.objects.create(
|
||||
title="Source",
|
||||
checksum="source",
|
||||
mime_type="application/pdf",
|
||||
owner=source_owner,
|
||||
)
|
||||
target_doc = Document.objects.create(
|
||||
title="Target",
|
||||
checksum="target",
|
||||
mime_type="application/pdf",
|
||||
owner=other_user,
|
||||
)
|
||||
custom_field_doclink = CustomField.objects.create(
|
||||
name="Test Custom Field Doc Link",
|
||||
data_type=CustomField.FieldDataType.DOCUMENTLINK,
|
||||
)
|
||||
|
||||
assign_perm("change_document", source_owner, target_doc)
|
||||
self.client.force_authenticate(user=source_owner)
|
||||
|
||||
resp = self.client.patch(
|
||||
f"/api/documents/{source_doc.id}/",
|
||||
data={
|
||||
"custom_fields": [
|
||||
{
|
||||
"field": custom_field_doclink.id,
|
||||
"value": [target_doc.id],
|
||||
},
|
||||
],
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
target_doc.refresh_from_db()
|
||||
self.assertEqual(
|
||||
target_doc.custom_fields.get(field=custom_field_doclink).value,
|
||||
[source_doc.id],
|
||||
)
|
||||
|
||||
def test_custom_field_filters(self) -> None:
|
||||
custom_field_string = CustomField.objects.create(
|
||||
name="Test Custom Field String",
|
||||
|
||||
@@ -702,6 +702,40 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
|
||||
|
||||
self.assertEqual(correction, None)
|
||||
|
||||
def test_search_spelling_suggestion_suppressed_for_private_terms(self):
|
||||
owner = User.objects.create_user("owner")
|
||||
attacker = User.objects.create_user("attacker")
|
||||
attacker.user_permissions.add(
|
||||
Permission.objects.get(codename="view_document"),
|
||||
)
|
||||
|
||||
with AsyncWriter(index.open_index()) as writer:
|
||||
for i in range(55):
|
||||
private_doc = Document.objects.create(
|
||||
checksum=f"p{i}",
|
||||
pk=100 + i,
|
||||
title=f"Private Document {i + 1}",
|
||||
content=f"treasury document {i + 1}",
|
||||
owner=owner,
|
||||
)
|
||||
visible_doc = Document.objects.create(
|
||||
checksum=f"v{i}",
|
||||
pk=200 + i,
|
||||
title=f"Visible Document {i + 1}",
|
||||
content=f"public ledger {i + 1}",
|
||||
owner=attacker,
|
||||
)
|
||||
index.update_document(writer, private_doc)
|
||||
index.update_document(writer, visible_doc)
|
||||
|
||||
self.client.force_authenticate(user=attacker)
|
||||
|
||||
response = self.client.get("/api/documents/?query=treasurx")
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["count"], 0)
|
||||
self.assertIsNone(response.data["corrected_query"])
|
||||
|
||||
@mock.patch(
|
||||
"whoosh.searching.Searcher.correct_query",
|
||||
side_effect=Exception("Test error"),
|
||||
@@ -772,6 +806,60 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(results[0]["id"], d3.id)
|
||||
self.assertEqual(results[1]["id"], d1.id)
|
||||
|
||||
def test_search_more_like_requires_view_permission_on_seed_document(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- A user can search documents they own
|
||||
- Another user's private document exists with similar content
|
||||
WHEN:
|
||||
- The user requests more-like-this for the private seed document
|
||||
THEN:
|
||||
- The request is rejected
|
||||
"""
|
||||
owner = User.objects.create_user("owner")
|
||||
attacker = User.objects.create_user("attacker")
|
||||
attacker.user_permissions.add(
|
||||
Permission.objects.get(codename="view_document"),
|
||||
)
|
||||
|
||||
private_seed = Document.objects.create(
|
||||
title="private bank statement",
|
||||
content="quarterly treasury bank statement wire transfer",
|
||||
checksum="seed",
|
||||
owner=owner,
|
||||
pk=10,
|
||||
)
|
||||
visible_doc = Document.objects.create(
|
||||
title="attacker-visible match",
|
||||
content="quarterly treasury bank statement wire transfer summary",
|
||||
checksum="visible",
|
||||
owner=attacker,
|
||||
pk=11,
|
||||
)
|
||||
other_doc = Document.objects.create(
|
||||
title="unrelated",
|
||||
content="completely different topic",
|
||||
checksum="other",
|
||||
owner=attacker,
|
||||
pk=12,
|
||||
)
|
||||
|
||||
with AsyncWriter(index.open_index()) as writer:
|
||||
index.update_document(writer, private_seed)
|
||||
index.update_document(writer, visible_doc)
|
||||
index.update_document(writer, other_doc)
|
||||
|
||||
self.client.force_authenticate(user=attacker)
|
||||
|
||||
response = self.client.get(
|
||||
f"/api/documents/?more_like_id={private_seed.id}",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
self.assertEqual(response.content, b"Insufficient permissions.")
|
||||
|
||||
def test_search_filtering(self) -> None:
|
||||
t = Tag.objects.create(name="tag")
|
||||
t2 = Tag.objects.create(name="tag2")
|
||||
@@ -1356,6 +1444,83 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(results["custom_fields"][0]["id"], custom_field1.id)
|
||||
self.assertEqual(results["workflows"][0]["id"], workflow1.id)
|
||||
|
||||
def test_global_search_filters_owned_mail_objects(self) -> None:
|
||||
user1 = User.objects.create_user("mail-search-user")
|
||||
user2 = User.objects.create_user("other-mail-search-user")
|
||||
user1.user_permissions.add(
|
||||
Permission.objects.get(codename="view_mailaccount"),
|
||||
Permission.objects.get(codename="view_mailrule"),
|
||||
)
|
||||
|
||||
own_account = MailAccount.objects.create(
|
||||
name="bank owned account",
|
||||
username="owner@example.com",
|
||||
password="secret",
|
||||
imap_server="imap.owner.example.com",
|
||||
imap_port=993,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
owner=user1,
|
||||
)
|
||||
other_account = MailAccount.objects.create(
|
||||
name="bank other account",
|
||||
username="other@example.com",
|
||||
password="secret",
|
||||
imap_server="imap.other.example.com",
|
||||
imap_port=993,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
owner=user2,
|
||||
)
|
||||
unowned_account = MailAccount.objects.create(
|
||||
name="bank shared account",
|
||||
username="shared@example.com",
|
||||
password="secret",
|
||||
imap_server="imap.shared.example.com",
|
||||
imap_port=993,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
)
|
||||
own_rule = MailRule.objects.create(
|
||||
name="bank owned rule",
|
||||
account=own_account,
|
||||
action=MailRule.MailAction.MOVE,
|
||||
owner=user1,
|
||||
)
|
||||
other_rule = MailRule.objects.create(
|
||||
name="bank other rule",
|
||||
account=other_account,
|
||||
action=MailRule.MailAction.MOVE,
|
||||
owner=user2,
|
||||
)
|
||||
unowned_rule = MailRule.objects.create(
|
||||
name="bank shared rule",
|
||||
account=unowned_account,
|
||||
action=MailRule.MailAction.MOVE,
|
||||
)
|
||||
|
||||
self.client.force_authenticate(user1)
|
||||
|
||||
response = self.client.get("/api/search/?query=bank")
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertCountEqual(
|
||||
[account["id"] for account in response.data["mail_accounts"]],
|
||||
[own_account.id, unowned_account.id],
|
||||
)
|
||||
self.assertCountEqual(
|
||||
[rule["id"] for rule in response.data["mail_rules"]],
|
||||
[own_rule.id, unowned_rule.id],
|
||||
)
|
||||
self.assertNotIn(
|
||||
other_account.id,
|
||||
[account["id"] for account in response.data["mail_accounts"]],
|
||||
)
|
||||
self.assertNotIn(
|
||||
other_rule.id,
|
||||
[rule["id"] for rule in response.data["mail_rules"]],
|
||||
)
|
||||
|
||||
def test_global_search_bad_request(self) -> None:
|
||||
"""
|
||||
WHEN:
|
||||
|
||||
@@ -86,11 +86,18 @@ class TestSystemStatus(APITestCase):
|
||||
"""
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
self.assertEqual(response["WWW-Authenticate"], "Token")
|
||||
normal_user = User.objects.create_user(username="normal_user")
|
||||
self.client.force_login(normal_user)
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_system_status_with_bad_basic_auth_challenges(self) -> None:
|
||||
self.client.credentials(HTTP_AUTHORIZATION="Basic invalid")
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
self.assertEqual(response["WWW-Authenticate"], 'Basic realm="api"')
|
||||
|
||||
def test_system_status_container_detection(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import logging
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
@@ -204,6 +205,52 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
)
|
||||
self.assertEqual(document.filename, "none/none.pdf")
|
||||
|
||||
@override_settings(FILENAME_FORMAT=None)
|
||||
def test_stale_save_recovers_already_moved_files(self) -> None:
|
||||
old_storage_path = StoragePath.objects.create(
|
||||
name="old-path",
|
||||
path="old/{{title}}",
|
||||
)
|
||||
new_storage_path = StoragePath.objects.create(
|
||||
name="new-path",
|
||||
path="new/{{title}}",
|
||||
)
|
||||
original_bytes = b"original"
|
||||
archive_bytes = b"archive"
|
||||
|
||||
doc = Document.objects.create(
|
||||
title="document",
|
||||
mime_type="application/pdf",
|
||||
checksum=hashlib.md5(original_bytes).hexdigest(),
|
||||
archive_checksum=hashlib.md5(archive_bytes).hexdigest(),
|
||||
filename="old/document.pdf",
|
||||
archive_filename="old/document.pdf",
|
||||
storage_path=old_storage_path,
|
||||
)
|
||||
create_source_path_directory(doc.source_path)
|
||||
doc.source_path.write_bytes(original_bytes)
|
||||
create_source_path_directory(doc.archive_path)
|
||||
doc.archive_path.write_bytes(archive_bytes)
|
||||
|
||||
stale_doc = Document.objects.get(pk=doc.pk)
|
||||
fresh_doc = Document.objects.get(pk=doc.pk)
|
||||
fresh_doc.storage_path = new_storage_path
|
||||
fresh_doc.save()
|
||||
doc.refresh_from_db()
|
||||
self.assertEqual(doc.filename, "new/document.pdf")
|
||||
self.assertEqual(doc.archive_filename, "new/document.pdf")
|
||||
|
||||
stale_doc.storage_path = new_storage_path
|
||||
stale_doc.save()
|
||||
|
||||
doc.refresh_from_db()
|
||||
self.assertEqual(doc.filename, "new/document.pdf")
|
||||
self.assertEqual(doc.archive_filename, "new/document.pdf")
|
||||
self.assertIsFile(doc.source_path)
|
||||
self.assertIsFile(doc.archive_path)
|
||||
self.assertIsNotFile(settings.ORIGINALS_DIR / "old" / "document.pdf")
|
||||
self.assertIsNotFile(settings.ARCHIVE_DIR / "old" / "document.pdf")
|
||||
|
||||
@override_settings(FILENAME_FORMAT="{correspondent}/{correspondent}")
|
||||
def test_document_delete(self) -> None:
|
||||
document = Document()
|
||||
|
||||
@@ -963,6 +963,64 @@ class TestWorkflows(
|
||||
self.assertEqual(Path(doc.filename), expected_filename)
|
||||
self.assertTrue(doc.source_path.is_file())
|
||||
|
||||
def test_workflow_document_updated_does_not_overwrite_filename(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- A document whose filename has been updated in the DB by a concurrent
|
||||
bulk_update_documents task (simulating update_filename_and_move_files
|
||||
completing and writing the new filename to the DB)
|
||||
- A stale in-memory document instance still holding the old filename
|
||||
- An active DOCUMENT_UPDATED workflow
|
||||
WHEN:
|
||||
- run_workflows is called with the stale in-memory instance
|
||||
(as would happen in the second concurrent bulk_update_documents task)
|
||||
THEN:
|
||||
- The DB filename is NOT overwritten with the stale in-memory value
|
||||
(regression test for GH #12386 — the race window between
|
||||
refresh_from_db and document.save in run_workflows)
|
||||
"""
|
||||
trigger = WorkflowTrigger.objects.create(
|
||||
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
|
||||
)
|
||||
action = WorkflowAction.objects.create(
|
||||
type=WorkflowAction.WorkflowActionType.ASSIGNMENT,
|
||||
assign_title="Updated by workflow",
|
||||
)
|
||||
workflow = Workflow.objects.create(name="Race condition test workflow", order=0)
|
||||
workflow.triggers.add(trigger)
|
||||
workflow.actions.add(action)
|
||||
workflow.save()
|
||||
|
||||
doc = Document.objects.create(
|
||||
title="race condition test",
|
||||
mime_type="application/pdf",
|
||||
checksum="racecondition123",
|
||||
original_filename="old.pdf",
|
||||
filename="old/path/old.pdf",
|
||||
)
|
||||
|
||||
# Simulate BUD-1 completing update_filename_and_move_files:
|
||||
# the DB now holds the new filename while BUD-2's in-memory instance is stale.
|
||||
new_filename = "new/path/new.pdf"
|
||||
Document.global_objects.filter(pk=doc.pk).update(filename=new_filename)
|
||||
|
||||
# The stale instance still has filename="old/path/old.pdf" in memory.
|
||||
# Mock refresh_from_db so the stale value persists through run_workflows,
|
||||
# replicating the race window between refresh and save.
|
||||
# Mock update_filename_and_move_files to prevent file-not-found errors
|
||||
# since we are only testing DB state here.
|
||||
with (
|
||||
mock.patch(
|
||||
"documents.signals.handlers.update_filename_and_move_files",
|
||||
),
|
||||
mock.patch.object(Document, "refresh_from_db"),
|
||||
):
|
||||
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
|
||||
|
||||
# The DB filename must not have been reverted to the stale old value.
|
||||
doc.refresh_from_db()
|
||||
self.assertEqual(doc.filename, new_filename)
|
||||
|
||||
def test_document_added_workflow(self) -> None:
|
||||
trigger = WorkflowTrigger.objects.create(
|
||||
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
|
||||
|
||||
+33
-4
@@ -83,6 +83,7 @@ from rest_framework import serializers
|
||||
from rest_framework import status
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.exceptions import NotFound
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.filters import OrderingFilter
|
||||
from rest_framework.filters import SearchFilter
|
||||
@@ -1332,6 +1333,7 @@ class DocumentViewSet(
|
||||
methods=["get", "post", "delete"],
|
||||
detail=True,
|
||||
permission_classes=[PaperlessNotePermissions],
|
||||
pagination_class=None,
|
||||
filter_backends=[],
|
||||
)
|
||||
def notes(self, request, pk=None):
|
||||
@@ -1969,11 +1971,28 @@ class UnifiedSearchViewSet(DocumentViewSet):
|
||||
filtered_queryset = super().filter_queryset(queryset)
|
||||
|
||||
if self._is_search_request():
|
||||
from documents import index
|
||||
|
||||
if "query" in self.request.query_params:
|
||||
from documents import index
|
||||
|
||||
query_class = index.DelayedFullTextQuery
|
||||
elif "more_like_id" in self.request.query_params:
|
||||
try:
|
||||
more_like_doc_id = int(self.request.query_params["more_like_id"])
|
||||
more_like_doc = Document.objects.select_related("owner").get(
|
||||
pk=more_like_doc_id,
|
||||
)
|
||||
except (TypeError, ValueError, Document.DoesNotExist):
|
||||
raise PermissionDenied(_("Invalid more_like_id"))
|
||||
|
||||
if not has_perms_owner_aware(
|
||||
self.request.user,
|
||||
"view_document",
|
||||
more_like_doc,
|
||||
):
|
||||
raise PermissionDenied(_("Insufficient permissions."))
|
||||
|
||||
from documents import index
|
||||
|
||||
query_class = index.DelayedMoreLikeThisQuery
|
||||
else:
|
||||
raise ValueError
|
||||
@@ -2009,6 +2028,8 @@ class UnifiedSearchViewSet(DocumentViewSet):
|
||||
return response
|
||||
except NotFound:
|
||||
raise
|
||||
except PermissionDenied as e:
|
||||
return HttpResponseForbidden(str(e.detail))
|
||||
except Exception as e:
|
||||
logger.warning(f"An error occurred listing search results: {e!s}")
|
||||
return HttpResponseBadRequest(
|
||||
@@ -2947,13 +2968,21 @@ class GlobalSearchView(PassUserMixin):
|
||||
)
|
||||
groups = groups[:OBJECT_LIMIT]
|
||||
mail_rules = (
|
||||
MailRule.objects.filter(name__icontains=query)
|
||||
get_objects_for_user_owner_aware(
|
||||
request.user,
|
||||
"view_mailrule",
|
||||
MailRule,
|
||||
).filter(name__icontains=query)
|
||||
if request.user.has_perm("paperless_mail.view_mailrule")
|
||||
else []
|
||||
)
|
||||
mail_rules = mail_rules[:OBJECT_LIMIT]
|
||||
mail_accounts = (
|
||||
MailAccount.objects.filter(name__icontains=query)
|
||||
get_objects_for_user_owner_aware(
|
||||
request.user,
|
||||
"view_mailaccount",
|
||||
MailAccount,
|
||||
).filter(name__icontains=query)
|
||||
if request.user.has_perm("paperless_mail.view_mailaccount")
|
||||
else []
|
||||
)
|
||||
|
||||
@@ -83,3 +83,11 @@ class PaperlessBasicAuthentication(authentication.BasicAuthentication):
|
||||
raise exceptions.AuthenticationFailed("MFA required")
|
||||
|
||||
return user_tuple
|
||||
|
||||
def authenticate_header(self, request):
|
||||
auth_header = request.META.get("HTTP_AUTHORIZATION", "")
|
||||
if auth_header.lower().startswith("basic "):
|
||||
return super().authenticate_header(request)
|
||||
|
||||
# Still 401 for anonymous API access
|
||||
return authentication.TokenAuthentication.keyword
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Final
|
||||
|
||||
__version__: Final[tuple[int, int, int]] = (2, 20, 11)
|
||||
__version__: Final[tuple[int, int, int]] = (2, 20, 13)
|
||||
# Version string like X.Y.Z
|
||||
__full_version_str__: Final[str] = ".".join(map(str, __version__))
|
||||
# Version string like X.Y
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
from django.utils.translation import gettext as _
|
||||
from rest_framework import serializers
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
|
||||
from documents.permissions import has_perms_owner_aware
|
||||
from documents.serialisers import CorrespondentField
|
||||
from documents.serialisers import DocumentTypeField
|
||||
from documents.serialisers import OwnedObjectSerializer
|
||||
@@ -128,6 +131,18 @@ class MailRuleSerializer(OwnedObjectSerializer):
|
||||
|
||||
return attrs
|
||||
|
||||
def validate_account(self, account):
|
||||
if self.user is not None and has_perms_owner_aware(
|
||||
self.user,
|
||||
"change_mailaccount",
|
||||
account,
|
||||
):
|
||||
return account
|
||||
|
||||
raise PermissionDenied(
|
||||
_("Insufficient permissions."),
|
||||
)
|
||||
|
||||
def validate_maximum_age(self, value):
|
||||
if value > 36500: # ~100 years
|
||||
raise serializers.ValidationError("Maximum mail age is unreasonably large.")
|
||||
|
||||
@@ -632,6 +632,116 @@ class TestAPIMailRules(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(returned_rule1.name, "Updated Name 1")
|
||||
self.assertEqual(returned_rule1.action, MailRule.MailAction.DELETE)
|
||||
|
||||
def test_create_mail_rule_forbidden_for_unpermitted_account(self) -> None:
|
||||
other_user = User.objects.create_user(username="mail-owner")
|
||||
foreign_account = MailAccount.objects.create(
|
||||
name="ForeignEmail",
|
||||
username="username1",
|
||||
password="password1",
|
||||
imap_server="server.example.com",
|
||||
imap_port=443,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
owner=other_user,
|
||||
)
|
||||
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
data={
|
||||
"name": "Rule1",
|
||||
"account": foreign_account.pk,
|
||||
"folder": "INBOX",
|
||||
"filter_from": "from@example.com",
|
||||
"maximum_age": 30,
|
||||
"action": MailRule.MailAction.MARK_READ,
|
||||
"assign_title_from": MailRule.TitleSource.FROM_SUBJECT,
|
||||
"assign_correspondent_from": MailRule.CorrespondentSource.FROM_NOTHING,
|
||||
"order": 0,
|
||||
"attachment_type": MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
self.assertEqual(MailRule.objects.count(), 0)
|
||||
|
||||
def test_create_mail_rule_allowed_for_granted_account_change_permission(
|
||||
self,
|
||||
) -> None:
|
||||
other_user = User.objects.create_user(username="mail-owner")
|
||||
foreign_account = MailAccount.objects.create(
|
||||
name="ForeignEmail",
|
||||
username="username1",
|
||||
password="password1",
|
||||
imap_server="server.example.com",
|
||||
imap_port=443,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
owner=other_user,
|
||||
)
|
||||
assign_perm("change_mailaccount", self.user, foreign_account)
|
||||
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
data={
|
||||
"name": "Rule1",
|
||||
"account": foreign_account.pk,
|
||||
"folder": "INBOX",
|
||||
"filter_from": "from@example.com",
|
||||
"maximum_age": 30,
|
||||
"action": MailRule.MailAction.MARK_READ,
|
||||
"assign_title_from": MailRule.TitleSource.FROM_SUBJECT,
|
||||
"assign_correspondent_from": MailRule.CorrespondentSource.FROM_NOTHING,
|
||||
"order": 0,
|
||||
"attachment_type": MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
||||
self.assertEqual(MailRule.objects.get().account, foreign_account)
|
||||
|
||||
def test_update_mail_rule_forbidden_for_unpermitted_account(self) -> None:
|
||||
own_account = MailAccount.objects.create(
|
||||
name="Email1",
|
||||
username="username1",
|
||||
password="password1",
|
||||
imap_server="server.example.com",
|
||||
imap_port=443,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
)
|
||||
other_user = User.objects.create_user(username="mail-owner")
|
||||
foreign_account = MailAccount.objects.create(
|
||||
name="ForeignEmail",
|
||||
username="username2",
|
||||
password="password2",
|
||||
imap_server="server.example.com",
|
||||
imap_port=443,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
owner=other_user,
|
||||
)
|
||||
rule1 = MailRule.objects.create(
|
||||
name="Rule1",
|
||||
account=own_account,
|
||||
folder="INBOX",
|
||||
filter_from="from@example.com",
|
||||
maximum_age=30,
|
||||
action=MailRule.MailAction.MARK_READ,
|
||||
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
|
||||
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
|
||||
order=0,
|
||||
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
|
||||
)
|
||||
|
||||
response = self.client.patch(
|
||||
f"{self.ENDPOINT}{rule1.pk}/",
|
||||
data={"account": foreign_account.pk},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
rule1.refresh_from_db()
|
||||
self.assertEqual(rule1.account, own_account)
|
||||
|
||||
def test_get_mail_rules_owner_aware(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
|
||||
Reference in New Issue
Block a user