From 244ffd331fd9a3c43bb5f17d33d284b6d9b9ff25 Mon Sep 17 00:00:00 2001 From: shamoon <4887959+shamoon@users.noreply.github.com> Date: Tue, 3 Mar 2026 10:54:52 -0800 Subject: [PATCH] Use source mode to determine which docs bulk edit use --- src/documents/bulk_edit.py | 112 ++++++++++---------- src/documents/tests/test_bulk_edit.py | 141 ++++++++++++++++++++++++++ 2 files changed, 195 insertions(+), 58 deletions(-) diff --git a/src/documents/bulk_edit.py b/src/documents/bulk_edit.py index 0a7803944..c2c219ec2 100644 --- a/src/documents/bulk_edit.py +++ b/src/documents/bulk_edit.py @@ -26,9 +26,12 @@ from documents.models import StoragePath from documents.models import Tag from documents.permissions import set_permissions_for_object from documents.plugins.helpers import DocumentsStatusManager +from documents.serialisers import SourceModeChoices from documents.tasks import bulk_update_documents from documents.tasks import consume_file from documents.tasks import update_document_content_maybe_archive_file +from documents.versioning import get_latest_version_for_root +from documents.versioning import get_root_document if TYPE_CHECKING: from django.contrib.auth.models import User @@ -97,23 +100,29 @@ def _get_root_and_current_docs_by_root_id( "owner", ) } - latest_versions_by_root_id: dict[int, Document] = {} - for version_doc in Document.objects.filter(root_document_id__in=root_ids).order_by( - "root_document_id", - "-id", - ): - root_id = version_doc.root_document_id - if root_id is None: - continue - latest_versions_by_root_id.setdefault(root_id, version_doc) - - current_docs: dict[int, Document] = { - root_id: latest_versions_by_root_id.get(root_id, root_docs[root_id]) - for root_id in root_docs - } + current_docs: dict[int, Document] = {} + for root_id, root_doc in root_docs.items(): + current_docs[root_id] = get_latest_version_for_root(root_doc) return root_docs, current_docs +def _resolve_root_and_source_doc( + doc: Document, + *, + source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION, +) -> tuple[Document, Document]: + root_doc = get_root_document(doc) + + if source_mode == SourceModeChoices.EXPLICIT_SELECTION: + return root_doc, doc + + # Version IDs are explicit by default, only a selected root resolves to latest + if doc.root_document_id is not None: + return root_doc, doc + + return root_doc, get_latest_version_for_root(root_doc) + + def set_correspondent( doc_ids: list[int], correspondent: Correspondent, @@ -421,21 +430,32 @@ def rotate( doc_ids: list[int], degrees: int, *, + source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION, user: User | None = None, ) -> Literal["OK"]: logger.info( f"Attempting to rotate {len(doc_ids)} documents by {degrees} degrees.", ) - doc_to_root_id = _get_root_ids_by_doc_id(doc_ids) - root_ids = set(doc_to_root_id.values()) - root_docs_by_id, current_docs_by_root_id = _get_root_and_current_docs_by_root_id( - root_ids, - ) + docs_by_id = { + doc.id: doc + for doc in Document.objects.select_related("root_document").filter( + id__in=doc_ids, + ) + } + docs_by_root_id: dict[int, tuple[Document, Document]] = {} + for doc_id in doc_ids: + doc = docs_by_id.get(doc_id) + if doc is None: + continue + root_doc, source_doc = _resolve_root_and_source_doc( + doc, + source_mode=source_mode, + ) + docs_by_root_id.setdefault(root_doc.id, (root_doc, source_doc)) + import pikepdf - for root_id in root_ids: - root_doc = root_docs_by_id[root_id] - source_doc = current_docs_by_root_id[root_id] + for root_doc, source_doc in docs_by_root_id.values(): if source_doc.mime_type != "application/pdf": logger.warning( f"Document {root_doc.id} is not a PDF, skipping rotation.", @@ -659,25 +679,17 @@ def delete_pages( doc_ids: list[int], pages: list[int], *, + source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION, user: User | None = None, ) -> Literal["OK"]: logger.info( f"Attempting to delete pages {pages} from {len(doc_ids)} documents", ) doc = Document.objects.select_related("root_document").get(id=doc_ids[0]) - root_doc: Document - if doc.root_document_id is None or doc.root_document is None: - root_doc = doc - else: - root_doc = doc.root_document - - source_doc = ( - Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc)) - .order_by("-id") - .first() + root_doc, source_doc = _resolve_root_and_source_doc( + doc, + source_mode=source_mode, ) - if source_doc is None: - source_doc = root_doc pages = sorted(pages) # sort pages to avoid index issues import pikepdf @@ -722,6 +734,7 @@ def edit_pdf( delete_original: bool = False, update_document: bool = False, include_metadata: bool = True, + source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION, user: User | None = None, ) -> Literal["OK"]: """ @@ -736,19 +749,10 @@ def edit_pdf( f"Editing PDF of document {doc_ids[0]} with {len(operations)} operations", ) doc = Document.objects.select_related("root_document").get(id=doc_ids[0]) - root_doc: Document - if doc.root_document_id is None or doc.root_document is None: - root_doc = doc - else: - root_doc = doc.root_document - - source_doc = ( - Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc)) - .order_by("-id") - .first() + root_doc, source_doc = _resolve_root_and_source_doc( + doc, + source_mode=source_mode, ) - if source_doc is None: - source_doc = root_doc import pikepdf pdf_docs: list[pikepdf.Pdf] = [] @@ -859,6 +863,7 @@ def remove_password( update_document: bool = False, delete_original: bool = False, include_metadata: bool = True, + source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION, user: User | None = None, ) -> Literal["OK"]: """ @@ -868,19 +873,10 @@ def remove_password( for doc_id in doc_ids: doc = Document.objects.select_related("root_document").get(id=doc_id) - root_doc: Document - if doc.root_document_id is None or doc.root_document is None: - root_doc = doc - else: - root_doc = doc.root_document - - source_doc = ( - Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc)) - .order_by("-id") - .first() + root_doc, source_doc = _resolve_root_and_source_doc( + doc, + source_mode=source_mode, ) - if source_doc is None: - source_doc = root_doc try: logger.info( f"Attempting password removal from document {doc_ids[0]}", diff --git a/src/documents/tests/test_bulk_edit.py b/src/documents/tests/test_bulk_edit.py index 40e247412..fa4fa3449 100644 --- a/src/documents/tests/test_bulk_edit.py +++ b/src/documents/tests/test_bulk_edit.py @@ -430,6 +430,29 @@ class TestBulkEdit(DirectoriesMixin, TestCase): self.assertEqual(root_docs[self.doc2.id].id, self.doc2.id) self.assertEqual(current_docs[self.doc2.id].id, version2.id) + def test_resolve_root_and_source_doc_latest_version_prefers_newest_version( + self, + ) -> None: + version1 = Document.objects.create( + checksum="B-v1", + title="B version 1", + root_document=self.doc2, + ) + version2 = Document.objects.create( + checksum="B-v2", + title="B version 2", + root_document=self.doc2, + ) + + root_doc, source_doc = bulk_edit._resolve_root_and_source_doc( + self.doc2, + source_mode="latest_version", + ) + + self.assertEqual(root_doc.id, self.doc2.id) + self.assertEqual(source_doc.id, version2.id) + self.assertNotEqual(source_doc.id, version1.id) + @mock.patch("documents.tasks.bulk_update_documents.delay") def test_set_permissions(self, m) -> None: doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id] @@ -1041,6 +1064,34 @@ class TestPDFActions(DirectoriesMixin, TestCase): self.assertIsNotNone(overrides) self.assertEqual(result, "OK") + @mock.patch("documents.data_models.magic.from_file", return_value="application/pdf") + @mock.patch("documents.tasks.consume_file.delay") + @mock.patch("pikepdf.open") + def test_rotate_explicit_selection_uses_root_source_when_root_selected( + self, + mock_open, + mock_consume_delay, + mock_magic, + ): + Document.objects.create( + checksum="B-v1", + title="B version 1", + root_document=self.doc2, + ) + fake_pdf = mock.MagicMock() + fake_pdf.pages = [mock.Mock()] + mock_open.return_value.__enter__.return_value = fake_pdf + + result = bulk_edit.rotate( + [self.doc2.id], + 90, + source_mode="explicit_selection", + ) + + self.assertEqual(result, "OK") + mock_open.assert_called_once_with(self.doc2.source_path) + mock_consume_delay.assert_called_once() + @mock.patch("documents.tasks.consume_file.delay") @mock.patch("pikepdf.Pdf.save") @mock.patch("documents.data_models.magic.from_file", return_value="application/pdf") @@ -1065,6 +1116,34 @@ class TestPDFActions(DirectoriesMixin, TestCase): self.assertIsNotNone(overrides) self.assertEqual(result, "OK") + @mock.patch("documents.data_models.magic.from_file", return_value="application/pdf") + @mock.patch("documents.tasks.consume_file.delay") + @mock.patch("pikepdf.open") + def test_delete_pages_explicit_selection_uses_root_source_when_root_selected( + self, + mock_open, + mock_consume_delay, + mock_magic, + ): + Document.objects.create( + checksum="B-v1", + title="B version 1", + root_document=self.doc2, + ) + fake_pdf = mock.MagicMock() + fake_pdf.pages = [mock.Mock(), mock.Mock()] + mock_open.return_value.__enter__.return_value = fake_pdf + + result = bulk_edit.delete_pages( + [self.doc2.id], + [1], + source_mode="explicit_selection", + ) + + self.assertEqual(result, "OK") + mock_open.assert_called_once_with(self.doc2.source_path) + mock_consume_delay.assert_called_once() + @mock.patch("documents.tasks.consume_file.delay") @mock.patch("pikepdf.Pdf.save") def test_delete_pages_with_error(self, mock_pdf_save, mock_consume_delay): @@ -1213,6 +1292,40 @@ class TestPDFActions(DirectoriesMixin, TestCase): self.assertTrue(str(consumable.original_file).endswith("_edited.pdf")) self.assertIsNotNone(overrides) + @mock.patch("documents.data_models.magic.from_file", return_value="application/pdf") + @mock.patch("documents.tasks.consume_file.delay") + @mock.patch("pikepdf.new") + @mock.patch("pikepdf.open") + def test_edit_pdf_explicit_selection_uses_root_source_when_root_selected( + self, + mock_open, + mock_new, + mock_consume_delay, + mock_magic, + ): + Document.objects.create( + checksum="B-v1", + title="B version 1", + root_document=self.doc2, + ) + fake_pdf = mock.MagicMock() + fake_pdf.pages = [mock.Mock()] + mock_open.return_value.__enter__.return_value = fake_pdf + output_pdf = mock.MagicMock() + output_pdf.pages = [] + mock_new.return_value = output_pdf + + result = bulk_edit.edit_pdf( + [self.doc2.id], + operations=[{"page": 1}], + update_document=True, + source_mode="explicit_selection", + ) + + self.assertEqual(result, "OK") + mock_open.assert_called_once_with(self.doc2.source_path) + mock_consume_delay.assert_called_once() + @mock.patch("documents.bulk_edit.group") @mock.patch("documents.tasks.consume_file.s") def test_edit_pdf_without_metadata( @@ -1333,6 +1446,34 @@ class TestPDFActions(DirectoriesMixin, TestCase): self.assertEqual(consumable.root_document_id, doc.id) self.assertIsNotNone(overrides) + @mock.patch("documents.data_models.magic.from_file", return_value="application/pdf") + @mock.patch("documents.tasks.consume_file.delay") + @mock.patch("pikepdf.open") + def test_remove_password_explicit_selection_uses_root_source_when_root_selected( + self, + mock_open, + mock_consume_delay, + mock_magic, + ) -> None: + Document.objects.create( + checksum="A-v1", + title="A version 1", + root_document=self.doc1, + ) + fake_pdf = mock.MagicMock() + mock_open.return_value.__enter__.return_value = fake_pdf + + result = bulk_edit.remove_password( + [self.doc1.id], + password="secret", + update_document=True, + source_mode="explicit_selection", + ) + + self.assertEqual(result, "OK") + mock_open.assert_called_once_with(self.doc1.source_path, password="secret") + mock_consume_delay.assert_called_once() + @mock.patch("documents.bulk_edit.chord") @mock.patch("documents.bulk_edit.group") @mock.patch("documents.tasks.consume_file.s")