Use source mode to determine which docs bulk edit use

This commit is contained in:
shamoon
2026-03-03 10:54:52 -08:00
parent 0031036b03
commit 244ffd331f
2 changed files with 195 additions and 58 deletions

View File

@@ -26,9 +26,12 @@ from documents.models import StoragePath
from documents.models import Tag
from documents.permissions import set_permissions_for_object
from documents.plugins.helpers import DocumentsStatusManager
from documents.serialisers import SourceModeChoices
from documents.tasks import bulk_update_documents
from documents.tasks import consume_file
from documents.tasks import update_document_content_maybe_archive_file
from documents.versioning import get_latest_version_for_root
from documents.versioning import get_root_document
if TYPE_CHECKING:
from django.contrib.auth.models import User
@@ -97,23 +100,29 @@ def _get_root_and_current_docs_by_root_id(
"owner",
)
}
latest_versions_by_root_id: dict[int, Document] = {}
for version_doc in Document.objects.filter(root_document_id__in=root_ids).order_by(
"root_document_id",
"-id",
):
root_id = version_doc.root_document_id
if root_id is None:
continue
latest_versions_by_root_id.setdefault(root_id, version_doc)
current_docs: dict[int, Document] = {
root_id: latest_versions_by_root_id.get(root_id, root_docs[root_id])
for root_id in root_docs
}
current_docs: dict[int, Document] = {}
for root_id, root_doc in root_docs.items():
current_docs[root_id] = get_latest_version_for_root(root_doc)
return root_docs, current_docs
def _resolve_root_and_source_doc(
doc: Document,
*,
source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION,
) -> tuple[Document, Document]:
root_doc = get_root_document(doc)
if source_mode == SourceModeChoices.EXPLICIT_SELECTION:
return root_doc, doc
# Version IDs are explicit by default, only a selected root resolves to latest
if doc.root_document_id is not None:
return root_doc, doc
return root_doc, get_latest_version_for_root(root_doc)
def set_correspondent(
doc_ids: list[int],
correspondent: Correspondent,
@@ -421,21 +430,32 @@ def rotate(
doc_ids: list[int],
degrees: int,
*,
source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
) -> Literal["OK"]:
logger.info(
f"Attempting to rotate {len(doc_ids)} documents by {degrees} degrees.",
)
doc_to_root_id = _get_root_ids_by_doc_id(doc_ids)
root_ids = set(doc_to_root_id.values())
root_docs_by_id, current_docs_by_root_id = _get_root_and_current_docs_by_root_id(
root_ids,
)
docs_by_id = {
doc.id: doc
for doc in Document.objects.select_related("root_document").filter(
id__in=doc_ids,
)
}
docs_by_root_id: dict[int, tuple[Document, Document]] = {}
for doc_id in doc_ids:
doc = docs_by_id.get(doc_id)
if doc is None:
continue
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
docs_by_root_id.setdefault(root_doc.id, (root_doc, source_doc))
import pikepdf
for root_id in root_ids:
root_doc = root_docs_by_id[root_id]
source_doc = current_docs_by_root_id[root_id]
for root_doc, source_doc in docs_by_root_id.values():
if source_doc.mime_type != "application/pdf":
logger.warning(
f"Document {root_doc.id} is not a PDF, skipping rotation.",
@@ -659,25 +679,17 @@ def delete_pages(
doc_ids: list[int],
pages: list[int],
*,
source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
) -> Literal["OK"]:
logger.info(
f"Attempting to delete pages {pages} from {len(doc_ids)} documents",
)
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
root_doc: Document
if doc.root_document_id is None or doc.root_document is None:
root_doc = doc
else:
root_doc = doc.root_document
source_doc = (
Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc))
.order_by("-id")
.first()
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
if source_doc is None:
source_doc = root_doc
pages = sorted(pages) # sort pages to avoid index issues
import pikepdf
@@ -722,6 +734,7 @@ def edit_pdf(
delete_original: bool = False,
update_document: bool = False,
include_metadata: bool = True,
source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
) -> Literal["OK"]:
"""
@@ -736,19 +749,10 @@ def edit_pdf(
f"Editing PDF of document {doc_ids[0]} with {len(operations)} operations",
)
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
root_doc: Document
if doc.root_document_id is None or doc.root_document is None:
root_doc = doc
else:
root_doc = doc.root_document
source_doc = (
Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc))
.order_by("-id")
.first()
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
if source_doc is None:
source_doc = root_doc
import pikepdf
pdf_docs: list[pikepdf.Pdf] = []
@@ -859,6 +863,7 @@ def remove_password(
update_document: bool = False,
delete_original: bool = False,
include_metadata: bool = True,
source_mode: SourceModeChoices = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
) -> Literal["OK"]:
"""
@@ -868,19 +873,10 @@ def remove_password(
for doc_id in doc_ids:
doc = Document.objects.select_related("root_document").get(id=doc_id)
root_doc: Document
if doc.root_document_id is None or doc.root_document is None:
root_doc = doc
else:
root_doc = doc.root_document
source_doc = (
Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc))
.order_by("-id")
.first()
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
if source_doc is None:
source_doc = root_doc
try:
logger.info(
f"Attempting password removal from document {doc_ids[0]}",

View File

@@ -430,6 +430,29 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
self.assertEqual(root_docs[self.doc2.id].id, self.doc2.id)
self.assertEqual(current_docs[self.doc2.id].id, version2.id)
def test_resolve_root_and_source_doc_latest_version_prefers_newest_version(
self,
) -> None:
version1 = Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
)
version2 = Document.objects.create(
checksum="B-v2",
title="B version 2",
root_document=self.doc2,
)
root_doc, source_doc = bulk_edit._resolve_root_and_source_doc(
self.doc2,
source_mode="latest_version",
)
self.assertEqual(root_doc.id, self.doc2.id)
self.assertEqual(source_doc.id, version2.id)
self.assertNotEqual(source_doc.id, version1.id)
@mock.patch("documents.tasks.bulk_update_documents.delay")
def test_set_permissions(self, m) -> None:
doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id]
@@ -1041,6 +1064,34 @@ class TestPDFActions(DirectoriesMixin, TestCase):
self.assertIsNotNone(overrides)
self.assertEqual(result, "OK")
@mock.patch("documents.data_models.magic.from_file", return_value="application/pdf")
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.open")
def test_rotate_explicit_selection_uses_root_source_when_root_selected(
self,
mock_open,
mock_consume_delay,
mock_magic,
):
Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
)
fake_pdf = mock.MagicMock()
fake_pdf.pages = [mock.Mock()]
mock_open.return_value.__enter__.return_value = fake_pdf
result = bulk_edit.rotate(
[self.doc2.id],
90,
source_mode="explicit_selection",
)
self.assertEqual(result, "OK")
mock_open.assert_called_once_with(self.doc2.source_path)
mock_consume_delay.assert_called_once()
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.Pdf.save")
@mock.patch("documents.data_models.magic.from_file", return_value="application/pdf")
@@ -1065,6 +1116,34 @@ class TestPDFActions(DirectoriesMixin, TestCase):
self.assertIsNotNone(overrides)
self.assertEqual(result, "OK")
@mock.patch("documents.data_models.magic.from_file", return_value="application/pdf")
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.open")
def test_delete_pages_explicit_selection_uses_root_source_when_root_selected(
self,
mock_open,
mock_consume_delay,
mock_magic,
):
Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
)
fake_pdf = mock.MagicMock()
fake_pdf.pages = [mock.Mock(), mock.Mock()]
mock_open.return_value.__enter__.return_value = fake_pdf
result = bulk_edit.delete_pages(
[self.doc2.id],
[1],
source_mode="explicit_selection",
)
self.assertEqual(result, "OK")
mock_open.assert_called_once_with(self.doc2.source_path)
mock_consume_delay.assert_called_once()
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.Pdf.save")
def test_delete_pages_with_error(self, mock_pdf_save, mock_consume_delay):
@@ -1213,6 +1292,40 @@ class TestPDFActions(DirectoriesMixin, TestCase):
self.assertTrue(str(consumable.original_file).endswith("_edited.pdf"))
self.assertIsNotNone(overrides)
@mock.patch("documents.data_models.magic.from_file", return_value="application/pdf")
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.new")
@mock.patch("pikepdf.open")
def test_edit_pdf_explicit_selection_uses_root_source_when_root_selected(
self,
mock_open,
mock_new,
mock_consume_delay,
mock_magic,
):
Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
)
fake_pdf = mock.MagicMock()
fake_pdf.pages = [mock.Mock()]
mock_open.return_value.__enter__.return_value = fake_pdf
output_pdf = mock.MagicMock()
output_pdf.pages = []
mock_new.return_value = output_pdf
result = bulk_edit.edit_pdf(
[self.doc2.id],
operations=[{"page": 1}],
update_document=True,
source_mode="explicit_selection",
)
self.assertEqual(result, "OK")
mock_open.assert_called_once_with(self.doc2.source_path)
mock_consume_delay.assert_called_once()
@mock.patch("documents.bulk_edit.group")
@mock.patch("documents.tasks.consume_file.s")
def test_edit_pdf_without_metadata(
@@ -1333,6 +1446,34 @@ class TestPDFActions(DirectoriesMixin, TestCase):
self.assertEqual(consumable.root_document_id, doc.id)
self.assertIsNotNone(overrides)
@mock.patch("documents.data_models.magic.from_file", return_value="application/pdf")
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.open")
def test_remove_password_explicit_selection_uses_root_source_when_root_selected(
self,
mock_open,
mock_consume_delay,
mock_magic,
) -> None:
Document.objects.create(
checksum="A-v1",
title="A version 1",
root_document=self.doc1,
)
fake_pdf = mock.MagicMock()
mock_open.return_value.__enter__.return_value = fake_pdf
result = bulk_edit.remove_password(
[self.doc1.id],
password="secret",
update_document=True,
source_mode="explicit_selection",
)
self.assertEqual(result, "OK")
mock_open.assert_called_once_with(self.doc1.source_path, password="secret")
mock_consume_delay.assert_called_once()
@mock.patch("documents.bulk_edit.chord")
@mock.patch("documents.bulk_edit.group")
@mock.patch("documents.tasks.consume_file.s")