diff --git a/src/documents/serialisers.py b/src/documents/serialisers.py index 647e0e8b5..ea8cc70b6 100644 --- a/src/documents/serialisers.py +++ b/src/documents/serialisers.py @@ -853,6 +853,25 @@ class ReadWriteSerializerMethodField(serializers.SerializerMethodField): return {self.field_name: data} +def validate_documentlink_targets(user, doc_ids): + if Document.objects.filter(id__in=doc_ids).count() != len(doc_ids): + raise serializers.ValidationError( + "Some documents in value don't exist or were specified twice.", + ) + + if user is None: + return + + target_documents = Document.objects.filter(id__in=doc_ids).select_related("owner") + if not all( + has_perms_owner_aware(user, "change_document", document) + for document in target_documents + ): + raise PermissionDenied( + _("Insufficient permissions."), + ) + + class CustomFieldInstanceSerializer(serializers.ModelSerializer): field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all()) value = ReadWriteSerializerMethodField(allow_null=True) @@ -943,12 +962,11 @@ class CustomFieldInstanceSerializer(serializers.ModelSerializer): "Value must be a list", ) doc_ids = data["value"] - if Document.objects.filter(id__in=doc_ids).count() != len( - data["value"], - ): - raise serializers.ValidationError( - "Some documents in value don't exist or were specified twice.", - ) + request = self.context.get("request") + validate_documentlink_targets( + getattr(request, "user", None) if request is not None else None, + doc_ids, + ) return data @@ -1498,6 +1516,19 @@ class BulkEditSerializer( f"Some custom fields in {name} don't exist or were specified twice.", ) + if isinstance(custom_fields, dict): + custom_field_map = CustomField.objects.in_bulk(ids) + for raw_field_id, value in custom_fields.items(): + field = custom_field_map.get(int(raw_field_id)) + if ( + field is not None + and field.data_type == CustomField.FieldDataType.DOCUMENTLINK + and value is not None + ): + if not isinstance(value, list): + raise serializers.ValidationError("Value must be a list") + validate_documentlink_targets(self.user, value) + def validate_method(self, method): if method == "set_correspondent": return bulk_edit.set_correspondent diff --git a/src/documents/tests/test_api_bulk_edit.py b/src/documents/tests/test_api_bulk_edit.py index be7a81cd4..8400372a7 100644 --- a/src/documents/tests/test_api_bulk_edit.py +++ b/src/documents/tests/test_api_bulk_edit.py @@ -262,6 +262,50 @@ class TestBulkEditAPI(DirectoriesMixin, APITestCase): self.assertEqual(kwargs["add_custom_fields"], [self.cf1.id]) self.assertEqual(kwargs["remove_custom_fields"], [self.cf2.id]) + @mock.patch("documents.serialisers.bulk_edit.modify_custom_fields") + def test_api_modify_custom_fields_documentlink_forbidden_for_unpermitted_target( + self, + m, + ): + self.setup_mock(m, "modify_custom_fields") + user = User.objects.create_user(username="doc-owner") + user.user_permissions.add(Permission.objects.get(codename="change_document")) + other_user = User.objects.create_user(username="other-user") + source_doc = Document.objects.create( + checksum="source", + title="Source", + owner=user, + ) + target_doc = Document.objects.create( + checksum="target", + title="Target", + owner=other_user, + ) + doclink_field = CustomField.objects.create( + name="doclink", + data_type=CustomField.FieldDataType.DOCUMENTLINK, + ) + + self.client.force_authenticate(user=user) + + response = self.client.post( + "/api/documents/bulk_edit/", + json.dumps( + { + "documents": [source_doc.id], + "method": "modify_custom_fields", + "parameters": { + "add_custom_fields": {doclink_field.id: [target_doc.id]}, + "remove_custom_fields": [], + }, + }, + ), + content_type="application/json", + ) + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + m.assert_not_called() + @mock.patch("documents.serialisers.bulk_edit.modify_custom_fields") def test_api_modify_custom_fields_with_values(self, m): self.setup_mock(m, "modify_custom_fields") diff --git a/src/documents/tests/test_api_custom_fields.py b/src/documents/tests/test_api_custom_fields.py index 8cc8f2cb2..998bc445a 100644 --- a/src/documents/tests/test_api_custom_fields.py +++ b/src/documents/tests/test_api_custom_fields.py @@ -6,6 +6,7 @@ from unittest.mock import ANY from django.contrib.auth.models import Permission from django.contrib.auth.models import User from django.test import override_settings +from guardian.shortcuts import assign_perm from rest_framework import status from rest_framework.test import APITestCase @@ -1247,6 +1248,100 @@ class TestCustomFieldsAPI(DirectoriesMixin, APITestCase): self.assertEqual(resp.status_code, status.HTTP_200_OK) self.assertEqual(doc5.custom_fields.first().value, [1]) + def test_documentlink_patch_requires_change_permission_on_target_documents(self): + source_owner = User.objects.create_user(username="source-owner") + source_owner.user_permissions.add( + Permission.objects.get(codename="change_document"), + ) + other_user = User.objects.create_user(username="other-user") + + source_doc = Document.objects.create( + title="Source", + checksum="source", + mime_type="application/pdf", + owner=source_owner, + ) + target_doc = Document.objects.create( + title="Target", + checksum="target", + mime_type="application/pdf", + owner=other_user, + ) + custom_field_doclink = CustomField.objects.create( + name="Test Custom Field Doc Link", + data_type=CustomField.FieldDataType.DOCUMENTLINK, + ) + + self.client.force_authenticate(user=source_owner) + + resp = self.client.patch( + f"/api/documents/{source_doc.id}/", + data={ + "custom_fields": [ + { + "field": custom_field_doclink.id, + "value": [target_doc.id], + }, + ], + }, + format="json", + ) + + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual( + CustomFieldInstance.objects.filter(field=custom_field_doclink).count(), + 0, + ) + + def test_documentlink_patch_allowed_with_change_permission_on_target_documents( + self, + ): + source_owner = User.objects.create_user(username="source-owner") + source_owner.user_permissions.add( + Permission.objects.get(codename="change_document"), + ) + other_user = User.objects.create_user(username="other-user") + + source_doc = Document.objects.create( + title="Source", + checksum="source", + mime_type="application/pdf", + owner=source_owner, + ) + target_doc = Document.objects.create( + title="Target", + checksum="target", + mime_type="application/pdf", + owner=other_user, + ) + custom_field_doclink = CustomField.objects.create( + name="Test Custom Field Doc Link", + data_type=CustomField.FieldDataType.DOCUMENTLINK, + ) + + assign_perm("change_document", source_owner, target_doc) + self.client.force_authenticate(user=source_owner) + + resp = self.client.patch( + f"/api/documents/{source_doc.id}/", + data={ + "custom_fields": [ + { + "field": custom_field_doclink.id, + "value": [target_doc.id], + }, + ], + }, + format="json", + ) + + self.assertEqual(resp.status_code, status.HTTP_200_OK) + target_doc.refresh_from_db() + self.assertEqual( + target_doc.custom_fields.get(field=custom_field_doclink).value, + [source_doc.id], + ) + def test_custom_field_filters(self): custom_field_string = CustomField.objects.create( name="Test Custom Field String",