mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-07-04 03:04:18 +00:00
Support all for BulkEditObjectsView
This commit is contained in:
@@ -2602,13 +2602,25 @@ class ShareLinkBundleSerializer(OwnedObjectSerializer):
|
||||
|
||||
class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
|
||||
objects = serializers.ListField(
|
||||
required=True,
|
||||
allow_empty=False,
|
||||
required=False,
|
||||
allow_empty=True,
|
||||
label="Objects",
|
||||
write_only=True,
|
||||
child=serializers.IntegerField(),
|
||||
)
|
||||
|
||||
all = serializers.BooleanField(
|
||||
default=False,
|
||||
required=False,
|
||||
write_only=True,
|
||||
)
|
||||
|
||||
filters = serializers.DictField(
|
||||
required=False,
|
||||
allow_empty=True,
|
||||
write_only=True,
|
||||
)
|
||||
|
||||
object_type = serializers.ChoiceField(
|
||||
choices=[
|
||||
"tags",
|
||||
@@ -2681,10 +2693,20 @@ class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
|
||||
|
||||
def validate(self, attrs):
|
||||
object_type = attrs["object_type"]
|
||||
objects = attrs["objects"]
|
||||
objects = attrs.get("objects")
|
||||
apply_to_all = attrs.get("all", False)
|
||||
operation = attrs.get("operation")
|
||||
|
||||
self._validate_objects(objects, object_type)
|
||||
if apply_to_all:
|
||||
attrs.setdefault("objects", [])
|
||||
else:
|
||||
if objects is None:
|
||||
raise serializers.ValidationError(
|
||||
"objects is required unless all is true.",
|
||||
)
|
||||
if len(objects) == 0:
|
||||
raise serializers.ValidationError("objects must not be empty")
|
||||
self._validate_objects(objects, object_type)
|
||||
|
||||
if operation == "set_permissions":
|
||||
permissions = attrs.get("permissions")
|
||||
|
||||
@@ -810,6 +810,62 @@ class TestBulkEditObjects(APITestCase):
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(StoragePath.objects.count(), 0)
|
||||
|
||||
def test_bulk_objects_delete_all_filtered(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing objects that can be filtered by name
|
||||
WHEN:
|
||||
- bulk_edit_objects API endpoint is called with all=true and filters
|
||||
THEN:
|
||||
- Matching objects are deleted without passing explicit IDs
|
||||
"""
|
||||
Correspondent.objects.create(name="c2")
|
||||
|
||||
response = self.client.post(
|
||||
"/api/bulk_edit_objects/",
|
||||
json.dumps(
|
||||
{
|
||||
"all": True,
|
||||
"filters": {"name__icontains": "c"},
|
||||
"object_type": "correspondents",
|
||||
"operation": "delete",
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(Correspondent.objects.count(), 0)
|
||||
|
||||
def test_bulk_objects_delete_all_filtered_tags_includes_descendants(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Root tag with descendants
|
||||
WHEN:
|
||||
- bulk_edit_objects API endpoint is called with all=true
|
||||
THEN:
|
||||
- Root tags and descendants are deleted
|
||||
"""
|
||||
parent = Tag.objects.create(name="parent")
|
||||
child = Tag.objects.create(name="child", tn_parent=parent)
|
||||
|
||||
response = self.client.post(
|
||||
"/api/bulk_edit_objects/",
|
||||
json.dumps(
|
||||
{
|
||||
"all": True,
|
||||
"filters": {"is_root": True},
|
||||
"object_type": "tags",
|
||||
"operation": "delete",
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertFalse(Tag.objects.filter(id=parent.id).exists())
|
||||
self.assertFalse(Tag.objects.filter(id=child.id).exists())
|
||||
|
||||
def test_bulk_edit_object_permissions_insufficient_global_perms(self) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
@@ -897,3 +953,40 @@ class TestBulkEditObjects(APITestCase):
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
self.assertEqual(response.content, b"Insufficient permissions")
|
||||
|
||||
def test_bulk_edit_all_filtered_permissions_insufficient_object_perms(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Filter-matching objects include one that the user cannot edit
|
||||
WHEN:
|
||||
- bulk_edit_objects API endpoint is called with all=true
|
||||
THEN:
|
||||
- Operation applies only to editable objects
|
||||
"""
|
||||
self.t2.owner = User.objects.get(username="temp_admin")
|
||||
self.t2.save()
|
||||
|
||||
self.user1.user_permissions.add(
|
||||
*Permission.objects.filter(codename="delete_tag"),
|
||||
)
|
||||
self.user1.save()
|
||||
self.client.force_authenticate(user=self.user1)
|
||||
|
||||
response = self.client.post(
|
||||
"/api/bulk_edit_objects/",
|
||||
json.dumps(
|
||||
{
|
||||
"all": True,
|
||||
"filters": {"name__icontains": "t"},
|
||||
"object_type": "tags",
|
||||
"operation": "delete",
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertTrue(Tag.objects.filter(id=self.t2.id).exists())
|
||||
self.assertFalse(Tag.objects.filter(id=self.t1.id).exists())
|
||||
|
||||
+43
-8
@@ -3921,20 +3921,55 @@ class BulkEditObjectsView(PassUserMixin):
|
||||
user = self.request.user
|
||||
object_type = serializer.validated_data.get("object_type")
|
||||
object_ids = serializer.validated_data.get("objects")
|
||||
apply_to_all = serializer.validated_data.get("all")
|
||||
object_class = serializer.get_object_class(object_type)
|
||||
operation = serializer.validated_data.get("operation")
|
||||
model_name = object_class._meta.model_name
|
||||
perm_codename = (
|
||||
f"change_{model_name}"
|
||||
if operation == "set_permissions"
|
||||
else f"delete_{model_name}"
|
||||
)
|
||||
|
||||
objs = object_class.objects.select_related("owner").filter(pk__in=object_ids)
|
||||
if apply_to_all:
|
||||
# Support all to avoid sending large lists of ids for bulk operations, with optional filters
|
||||
filters = serializer.validated_data.get("filters") or {}
|
||||
filterset_class = {
|
||||
"tags": TagFilterSet,
|
||||
"correspondents": CorrespondentFilterSet,
|
||||
"document_types": DocumentTypeFilterSet,
|
||||
"storage_paths": StoragePathFilterSet,
|
||||
}[object_type]
|
||||
user_permitted_objects = get_objects_for_user_owner_aware(
|
||||
user,
|
||||
perm_codename,
|
||||
object_class,
|
||||
)
|
||||
objs = filterset_class(
|
||||
data=filters,
|
||||
queryset=user_permitted_objects,
|
||||
).qs
|
||||
if object_type == "tags":
|
||||
editable_ids = set(user_permitted_objects.values_list("pk", flat=True))
|
||||
all_ids = set(objs.values_list("pk", flat=True))
|
||||
for tag in objs:
|
||||
all_ids.update(
|
||||
descendant.pk
|
||||
for descendant in tag.get_descendants()
|
||||
if descendant.pk in editable_ids
|
||||
)
|
||||
objs = object_class.objects.filter(pk__in=all_ids)
|
||||
objs = objs.select_related("owner")
|
||||
object_ids = list(objs.values_list("pk", flat=True))
|
||||
else:
|
||||
objs = object_class.objects.select_related("owner").filter(
|
||||
pk__in=object_ids,
|
||||
)
|
||||
|
||||
if not user.is_superuser:
|
||||
model_name = object_class._meta.model_name
|
||||
perm = (
|
||||
f"documents.change_{model_name}"
|
||||
if operation == "set_permissions"
|
||||
else f"documents.delete_{model_name}"
|
||||
)
|
||||
perm = f"documents.{perm_codename}"
|
||||
has_perms = user.has_perm(perm) and all(
|
||||
(obj.owner == user or obj.owner is None) for obj in objs
|
||||
has_perms_owner_aware(user, perm_codename, obj) for obj in objs
|
||||
)
|
||||
|
||||
if not has_perms:
|
||||
|
||||
Reference in New Issue
Block a user