Merge branch 'release/v2.20.x'

This commit is contained in:
shamoon
2026-02-16 07:24:37 -08:00
9 changed files with 496 additions and 63 deletions

View File

@@ -15,7 +15,3 @@ else
echo "Unknown user."
exit 1
fi
er "$@"
elif [[ $(id -un) == "paperless" ]]; then
s6-setuidgid paperless python3 manage.py document_create_classifier "$@"
fi

View File

@@ -431,8 +431,10 @@ This allows for complex logic to be included in the format, including [logical s
and [filters](https://jinja.palletsprojects.com/en/3.1.x/templates/#id11) to manipulate the [variables](#filename-format-variables)
provided. The template is provided as a string, potentially multiline, and rendered into a single line.
In addition, the entire Document instance is available to be utilized in a more advanced way, as well as some variables which only make sense to be accessed
with more complex logic.
In addition, a limited `document` object is available for advanced templates.
This object includes common metadata fields such as `id`, `pk`, `title`, `content`, `page_count`, `created`, `added`, `modified`, `mime_type`,
`checksum`, `archive_checksum`, `archive_serial_number`, `filename`, `archive_filename`, and `original_filename`.
Related values are available as nested objects with limited fields, for example document.correspondent.name, etc.
#### Custom Jinja2 Filters

View File

@@ -364,7 +364,7 @@ export abstract class ManagementListComponent<T extends MatchingModel>
backdrop: 'static',
})
modal.componentInstance.title = $localize`Confirm delete`
modal.componentInstance.messageBold = $localize`This operation will permanently delete all objects.`
modal.componentInstance.messageBold = $localize`This operation will permanently delete the selected ${this.typeNamePlural}.`
modal.componentInstance.message = $localize`This operation cannot be undone.`
modal.componentInstance.btnClass = 'btn-danger'
modal.componentInstance.btnCaption = $localize`Proceed`

View File

@@ -2,10 +2,17 @@ from django.contrib.auth.models import Group
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count
from django.db.models import IntegerField
from django.db.models import OuterRef
from django.db.models import Q
from django.db.models import QuerySet
from django.db.models import Subquery
from django.db.models.functions import Cast
from django.db.models.functions import Coalesce
from guardian.core import ObjectPermissionChecker
from guardian.models import GroupObjectPermission
from guardian.models import UserObjectPermission
from guardian.shortcuts import assign_perm
from guardian.shortcuts import get_objects_for_user
from guardian.shortcuts import get_users_with_perms
@@ -129,23 +136,96 @@ def set_permissions_for_object(permissions: dict, object, *, merge: bool = False
)
def _permitted_document_ids(user):
"""
Return a queryset of document IDs the user may view, limited to non-deleted
documents. This intentionally avoids ``get_objects_for_user`` to keep the
subquery small and index-friendly.
"""
base_docs = Document.objects.filter(deleted_at__isnull=True).only("id", "owner")
if user is None or not getattr(user, "is_authenticated", False):
# Just Anonymous user e.g. for drf-spectacular
return base_docs.filter(owner__isnull=True).values_list("id", flat=True)
if getattr(user, "is_superuser", False):
return base_docs.values_list("id", flat=True)
document_ct = ContentType.objects.get_for_model(Document)
perm_filter = {
"permission__codename": "view_document",
"permission__content_type": document_ct,
}
user_perm_docs = (
UserObjectPermission.objects.filter(user=user, **perm_filter)
.annotate(object_pk_int=Cast("object_pk", IntegerField()))
.values_list("object_pk_int", flat=True)
)
group_perm_docs = (
GroupObjectPermission.objects.filter(group__user=user, **perm_filter)
.annotate(object_pk_int=Cast("object_pk", IntegerField()))
.values_list("object_pk_int", flat=True)
)
permitted_documents = user_perm_docs.union(group_perm_docs)
return base_docs.filter(
Q(owner=user) | Q(owner__isnull=True) | Q(id__in=permitted_documents),
).values_list("id", flat=True)
def get_document_count_filter_for_user(user):
"""
Return the Q object used to filter document counts for the given user.
The filter is expressed as an ``id__in`` against a small subquery of permitted
document IDs to keep the generated SQL simple and avoid large OR clauses.
"""
if user is None or not getattr(user, "is_authenticated", False):
return Q(documents__deleted_at__isnull=True, documents__owner__isnull=True)
if getattr(user, "is_superuser", False):
# Superuser: no permission filtering needed
return Q(documents__deleted_at__isnull=True)
return Q(
documents__deleted_at__isnull=True,
documents__id__in=get_objects_for_user_owner_aware(
user,
"documents.view_document",
Document,
).values_list("id", flat=True),
permitted_ids = _permitted_document_ids(user)
return Q(documents__id__in=permitted_ids)
def annotate_document_count_for_related_queryset(
queryset,
through_model,
related_object_field: str,
target_field: str = "document_id",
user=None,
):
"""
Annotate a queryset with permissions-aware document counts using a subquery
against a relation table.
Args:
queryset: base queryset to annotate (must contain pk)
through_model: model representing the relation (e.g., Document.tags.through
or CustomFieldInstance)
source_field: field on the relation pointing back to queryset pk
target_field: field on the relation pointing to Document id
user: the user for whom to filter permitted document ids
"""
permitted_ids = _permitted_document_ids(user)
counts = (
through_model.objects.filter(
**{
related_object_field: OuterRef("pk"),
f"{target_field}__in": permitted_ids,
},
)
.values(related_object_field)
.annotate(c=Count(target_field))
.values("c")
)
return queryset.annotate(document_count=Coalesce(Subquery(counts[:1]), 0))
def get_objects_for_user_owner_aware(user, perms, Model) -> QuerySet:

View File

@@ -6,6 +6,7 @@ import re
from datetime import datetime
from decimal import Decimal
from typing import TYPE_CHECKING
from typing import Any
from typing import Literal
import magic
@@ -73,6 +74,7 @@ from documents.models import WorkflowTrigger
from documents.parsers import is_mime_type_supported
from documents.permissions import get_document_count_filter_for_user
from documents.permissions import get_groups_with_only_permission
from documents.permissions import get_objects_for_user_owner_aware
from documents.permissions import set_permissions_for_object
from documents.regex import validate_regex_pattern
from documents.templating.filepath import validate_filepath_template_and_render
@@ -713,6 +715,9 @@ class StoragePathField(serializers.PrimaryKeyRelatedField):
class CustomFieldSerializer(serializers.ModelSerializer):
def __init__(self, *args, **kwargs):
# Ignore args passed by permissions mixin
kwargs.pop("user", None)
kwargs.pop("full_perms", None)
context = kwargs.get("context")
self.api_version = int(
context.get("request").version
@@ -2750,8 +2755,22 @@ class StoragePathTestSerializer(SerializerWithPerms):
)
document = serializers.PrimaryKeyRelatedField(
queryset=Document.objects.all(),
queryset=Document.objects.none(),
required=True,
label="Document",
write_only=True,
)
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
request = self.context.get("request")
user = getattr(request, "user", None) if request else None
if user is not None and user.is_authenticated:
document_field = self.fields.get("document")
if not isinstance(document_field, serializers.PrimaryKeyRelatedField):
return
document_field.queryset = get_objects_for_user_owner_aware(
user,
"documents.view_document",
Document,
)

View File

@@ -193,6 +193,52 @@ def get_basic_metadata_context(
}
def get_safe_document_context(
document: Document,
tags: Iterable[Tag],
) -> dict[str, object]:
"""
Build a document context object to avoid supplying entire model instance.
"""
return {
"id": document.pk,
"pk": document.pk,
"title": document.title,
"content": document.content,
"page_count": document.page_count,
"created": document.created,
"added": document.added,
"modified": document.modified,
"archive_serial_number": document.archive_serial_number,
"mime_type": document.mime_type,
"checksum": document.checksum,
"archive_checksum": document.archive_checksum,
"filename": document.filename,
"archive_filename": document.archive_filename,
"original_filename": document.original_filename,
"owner": {"username": document.owner.username, "id": document.owner.id}
if document.owner
else None,
"tags": [{"name": tag.name, "id": tag.id} for tag in tags],
"correspondent": (
{"name": document.correspondent.name, "id": document.correspondent.id}
if document.correspondent
else None
),
"document_type": (
{"name": document.document_type.name, "id": document.document_type.id}
if document.document_type
else None
),
"storage_path": {
"path": document.storage_path.path,
"id": document.storage_path.id,
}
if document.storage_path
else None,
}
def get_tags_context(tags: Iterable[Tag]) -> dict[str, str | list[str]]:
"""
Given an Iterable of tags, constructs some context from them for usage
@@ -303,7 +349,7 @@ def validate_filepath_template_and_render(
# Build the context dictionary
context = (
{"document": document}
{"document": get_safe_document_context(document, tags=tags_list)}
| get_basic_metadata_context(document, no_value_default=NO_VALUE_PLACEHOLDER)
| get_creation_date_context(document)
| get_added_date_context(document)

View File

@@ -5,10 +5,13 @@ from unittest import mock
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.test import override_settings
from guardian.shortcuts import assign_perm
from rest_framework import status
from rest_framework.test import APITestCase
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
@@ -398,6 +401,292 @@ class TestApiStoragePaths(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "folder/Something")
def test_test_storage_path_requires_document_view_permission(self) -> None:
owner = User.objects.create_user(username="owner")
unprivileged = User.objects.create_user(username="unprivileged")
document = Document.objects.create(
mime_type="application/pdf",
owner=owner,
title="Sensitive",
checksum="123",
)
self.client.force_authenticate(user=unprivileged)
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": "path/{{ title }}",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("document", response.data)
def test_test_storage_path_allows_shared_document_view_permission(self) -> None:
owner = User.objects.create_user(username="owner")
viewer = User.objects.create_user(username="viewer")
document = Document.objects.create(
mime_type="application/pdf",
owner=owner,
title="Shared",
checksum="123",
)
assign_perm("view_document", viewer, document)
self.client.force_authenticate(user=viewer)
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": "path/{{ title }}",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "path/Shared")
def test_test_storage_path_exposes_basic_document_context_but_not_sensitive_owner_data(
self,
) -> None:
owner = User.objects.create_user(
username="owner",
password="password",
email="owner@example.com",
)
document = Document.objects.create(
mime_type="application/pdf",
owner=owner,
title="Document",
content="Top secret content",
page_count=2,
checksum="123",
)
self.client.force_authenticate(user=owner)
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": "{{ document.owner.username }}",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "owner")
for expression, expected in (
("{{ document.content }}", "Top secret content"),
("{{ document.id }}", str(document.id)),
("{{ document.page_count }}", "2"),
):
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": expression,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, expected)
for expression in (
"{{ document.owner.password }}",
"{{ document.owner.email }}",
):
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": expression,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIsNone(response.data)
def test_test_storage_path_includes_related_objects_for_visible_document(
self,
) -> None:
owner = User.objects.create_user(username="owner")
viewer = User.objects.create_user(username="viewer")
private_correspondent = Correspondent.objects.create(
name="Private Correspondent",
owner=owner,
)
document = Document.objects.create(
mime_type="application/pdf",
owner=owner,
correspondent=private_correspondent,
title="Document",
checksum="123",
)
assign_perm("view_document", viewer, document)
self.client.force_authenticate(user=viewer)
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": "{{ correspondent }}",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "Private Correspondent")
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": (
"{{ document.correspondent.name if document.correspondent else 'none' }}"
),
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "Private Correspondent")
def test_test_storage_path_superuser_can_view_private_related_objects(self) -> None:
owner = User.objects.create_user(username="owner")
private_correspondent = Correspondent.objects.create(
name="Private Correspondent",
owner=owner,
)
document = Document.objects.create(
mime_type="application/pdf",
owner=owner,
correspondent=private_correspondent,
title="Document",
checksum="123",
)
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": (
"{{ document.correspondent.name if document.correspondent else 'none' }}"
),
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "Private Correspondent")
def test_test_storage_path_includes_doc_type_storage_path_and_tags(
self,
) -> None:
owner = User.objects.create_user(username="owner")
viewer = User.objects.create_user(username="viewer")
private_document_type = DocumentType.objects.create(
name="Private Type",
owner=owner,
)
private_storage_path = StoragePath.objects.create(
name="Private Storage Path",
path="private/path",
owner=owner,
)
private_tag = Tag.objects.create(
name="Private Tag",
owner=owner,
)
document = Document.objects.create(
mime_type="application/pdf",
owner=owner,
document_type=private_document_type,
storage_path=private_storage_path,
title="Document",
checksum="123",
)
document.tags.add(private_tag)
assign_perm("view_document", viewer, document)
self.client.force_authenticate(user=viewer)
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": (
"{{ document.document_type.name if document.document_type else 'none' }}/"
"{{ document.storage_path.path if document.storage_path else 'none' }}/"
"{{ document.tags[0].name if document.tags else 'none' }}"
),
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "Private Type/private/path/Private Tag")
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": "{{ document_type }}/{{ tag_list if tag_list else 'none' }}",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "Private Type/Private Tag")
def test_test_storage_path_includes_custom_fields_for_visible_document(
self,
) -> None:
owner = User.objects.create_user(username="owner")
viewer = User.objects.create_user(username="viewer")
document = Document.objects.create(
mime_type="application/pdf",
owner=owner,
title="Document",
checksum="123",
)
custom_field = CustomField.objects.create(
name="Secret Number",
data_type=CustomField.FieldDataType.INT,
)
CustomFieldInstance.objects.create(
document=document,
field=custom_field,
value_int=42,
)
assign_perm("view_document", viewer, document)
self.client.force_authenticate(user=viewer)
response = self.client.post(
f"{self.ENDPOINT}test/",
json.dumps(
{
"document": document.id,
"path": "{{ custom_fields | get_cf_value('Secret Number', 'none') }}",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, "42")
class TestBulkEditObjects(APITestCase):
# See test_api_permissions.py for bulk tests on permissions

View File

@@ -1382,11 +1382,11 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
def test_template_with_security(self):
"""
GIVEN:
- Filename format with one or more undefined variables
- Filename format with an unavailable document attribute
WHEN:
- Filepath for a document with this format is called
THEN:
- The first undefined variable is logged
- The missing attribute is logged
- The default format is used
"""
doc_a = Document.objects.create(
@@ -1408,7 +1408,7 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
self.assertEqual(len(capture.output), 1)
self.assertEqual(
capture.output[0],
"WARNING:paperless.templating:Template attempted restricted operation: <bound method Model.save of <Document: 2020-06-25 Does Matter>> is not safely callable",
"ERROR:paperless.templating:Template variable error: 'dict object' has no attribute 'save'",
)
def test_template_with_custom_fields(self):

View File

@@ -32,7 +32,6 @@ from django.db.models import Count
from django.db.models import IntegerField
from django.db.models import Max
from django.db.models import Model
from django.db.models import Q
from django.db.models import Sum
from django.db.models import When
from django.db.models.functions import Length
@@ -128,6 +127,7 @@ from documents.matching import match_storage_paths
from documents.matching import match_tags
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import Note
@@ -147,6 +147,7 @@ from documents.permissions import PaperlessAdminPermissions
from documents.permissions import PaperlessNotePermissions
from documents.permissions import PaperlessObjectPermissions
from documents.permissions import ViewDocumentsPermissions
from documents.permissions import annotate_document_count_for_related_queryset
from documents.permissions import get_document_count_filter_for_user
from documents.permissions import get_objects_for_user_owner_aware
from documents.permissions import has_perms_owner_aware
@@ -370,22 +371,37 @@ class PermissionsAwareDocumentCountMixin(BulkPermissionMixin, PassUserMixin):
Mixin to add document count to queryset, permissions-aware if needed
"""
# Default is simple relation path, override for through-table/count specialization.
document_count_through = None
document_count_source_field = None
def get_document_count_filter(self):
request = getattr(self, "request", None)
user = getattr(request, "user", None) if request else None
return get_document_count_filter_for_user(user)
def get_queryset(self):
base_qs = super().get_queryset()
# Use optimized through-table counting when configured.
if self.document_count_through:
user = getattr(getattr(self, "request", None), "user", None)
return annotate_document_count_for_related_queryset(
base_qs,
through_model=self.document_count_through,
related_object_field=self.document_count_source_field,
user=user,
)
# Fallback: simple Count on relation with permission filter.
filter = self.get_document_count_filter()
return (
super()
.get_queryset()
.annotate(document_count=Count("documents", filter=filter))
return base_qs.annotate(
document_count=Count("documents", filter=filter),
)
@extend_schema_view(**generate_object_with_permissions_schema(CorrespondentSerializer))
class CorrespondentViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
class CorrespondentViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
model = Correspondent
queryset = Correspondent.objects.select_related("owner").order_by(Lower("name"))
@@ -422,8 +438,10 @@ class CorrespondentViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
@extend_schema_view(**generate_object_with_permissions_schema(TagSerializer))
class TagViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
class TagViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
model = Tag
document_count_through = Document.tags.through
document_count_source_field = "tag_id"
queryset = Tag.objects.select_related("owner").order_by(
Lower("name"),
@@ -466,12 +484,16 @@ class TagViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
descendant_pks = {pk for tag in all_tags for pk in tag.get_descendants_pks()}
if descendant_pks:
filter_q = self.get_document_count_filter()
user = getattr(getattr(self, "request", None), "user", None)
children_source = list(
annotate_document_count_for_related_queryset(
Tag.objects.filter(pk__in=descendant_pks | {t.pk for t in all_tags})
.select_related("owner")
.annotate(document_count=Count("documents", filter=filter_q))
.order_by(*ordering),
through_model=self.document_count_through,
related_object_field=self.document_count_source_field,
user=user,
),
)
else:
children_source = all_tags
@@ -498,7 +520,7 @@ class TagViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
@extend_schema_view(**generate_object_with_permissions_schema(DocumentTypeSerializer))
class DocumentTypeViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
class DocumentTypeViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
model = DocumentType
queryset = DocumentType.objects.select_related("owner").order_by(Lower("name"))
@@ -2344,7 +2366,7 @@ class BulkDownloadView(GenericAPIView):
@extend_schema_view(**generate_object_with_permissions_schema(StoragePathSerializer))
class StoragePathViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
class StoragePathViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
model = StoragePath
queryset = StoragePath.objects.select_related("owner").order_by(
@@ -2389,7 +2411,10 @@ class StoragePathViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
"""
Test storage path against a document
"""
serializer = StoragePathTestSerializer(data=request.data)
serializer = StoragePathTestSerializer(
data=request.data,
context={"request": request},
)
serializer.is_valid(raise_exception=True)
document = serializer.validated_data.get("document")
@@ -2861,7 +2886,7 @@ class WorkflowViewSet(ModelViewSet):
)
class CustomFieldViewSet(ModelViewSet):
class CustomFieldViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
permission_classes = (IsAuthenticated, PaperlessObjectPermissions)
serializer_class = CustomFieldSerializer
@@ -2873,35 +2898,11 @@ class CustomFieldViewSet(ModelViewSet):
filterset_class = CustomFieldFilterSet
model = CustomField
document_count_through = CustomFieldInstance
document_count_source_field = "field_id"
queryset = CustomField.objects.all().order_by("-created")
def get_queryset(self):
filter = (
Q(fields__document__deleted_at__isnull=True)
if self.request.user is None or self.request.user.is_superuser
else (
Q(
fields__document__deleted_at__isnull=True,
fields__document__id__in=get_objects_for_user_owner_aware(
self.request.user,
"documents.view_document",
Document,
).values_list("id", flat=True),
)
)
)
return (
super()
.get_queryset()
.annotate(
document_count=Count(
"fields",
filter=filter,
),
)
)
@extend_schema_view(
get=extend_schema(