use a few named tuples and data classes instead of so much unpacking

This commit is contained in:
Trenton Holmes
2026-05-02 20:00:10 -07:00
parent b38d112691
commit 09d1f09734
2 changed files with 128 additions and 93 deletions
+55 -64
View File
@@ -5,6 +5,7 @@ import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Literal
from typing import NamedTuple
from celery import chord
from celery import group
@@ -46,6 +47,11 @@ class SourceModeChoices:
EXPLICIT_SELECTION: SourceMode = "explicit_selection"
class ResolvedDocPair(NamedTuple):
root_doc: Document
source_doc: Document
@shared_task(bind=True)
def restore_archive_serial_numbers_task(
self,
@@ -86,17 +92,20 @@ def _resolve_root_and_source_doc(
doc: Document,
*,
source_mode: SourceMode = SourceModeChoices.LATEST_VERSION,
) -> tuple[Document, Document]:
) -> ResolvedDocPair:
root_doc = get_root_document(doc)
if source_mode == SourceModeChoices.EXPLICIT_SELECTION:
return root_doc, doc
return ResolvedDocPair(root_doc=root_doc, source_doc=doc)
# Version IDs are explicit by default, only a selected root resolves to latest
if doc.root_document_id is not None:
return root_doc, doc
return ResolvedDocPair(root_doc=root_doc, source_doc=doc)
return root_doc, get_latest_version_for_root(root_doc)
return ResolvedDocPair(
root_doc=root_doc,
source_doc=get_latest_version_for_root(root_doc),
)
def set_correspondent(
@@ -442,39 +451,36 @@ def rotate(
id__in=doc_ids,
)
}
docs_by_root_id: dict[int, tuple[Document, Document]] = {}
docs_by_root_id: dict[int, ResolvedDocPair] = {}
for doc_id in doc_ids:
doc = docs_by_id.get(doc_id)
if doc is None:
continue
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
docs_by_root_id.setdefault(root_doc.id, (root_doc, source_doc))
pair = _resolve_root_and_source_doc(doc, source_mode=source_mode)
docs_by_root_id.setdefault(pair.root_doc.id, pair)
import pikepdf
for root_doc, source_doc in docs_by_root_id.values():
if source_doc.mime_type != "application/pdf":
for pair in docs_by_root_id.values():
if pair.source_doc.mime_type != "application/pdf":
logger.warning(
f"Document {root_doc.id} is not a PDF, skipping rotation.",
f"Document {pair.root_doc.id} is not a PDF, skipping rotation.",
)
continue
try:
# Write rotated output to a temp file and create a new version via consume pipeline
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{root_doc.id}_rotated.pdf"
/ f"{pair.root_doc.id}_rotated.pdf"
)
with pikepdf.open(source_doc.source_path) as pdf:
with pikepdf.open(pair.source_doc.source_path) as pdf:
for page in pdf.pages:
page.rotate(degrees, relative=True)
pdf.remove_unreferenced_resources()
pdf.save(filepath)
# Preserve metadata/permissions via overrides; mark as new version
overrides = DocumentMetadataOverrides().from_document(root_doc)
overrides = DocumentMetadataOverrides().from_document(pair.root_doc)
if user is not None:
overrides.actor_id = user.id
@@ -483,17 +489,17 @@ def rotate(
"input_doc": ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
root_document_id=pair.root_doc.id,
),
"overrides": overrides,
},
headers={"trigger_source": trigger_source},
)
logger.info(
f"Queued new rotated version for document {root_doc.id} by {degrees} degrees",
f"Queued new rotated version for document {pair.root_doc.id} by {degrees} degrees",
)
except Exception as e:
logger.exception(f"Error rotating document {root_doc.id}: {e}")
logger.exception(f"Error rotating document {pair.root_doc.id}: {e}")
return "OK"
@@ -524,17 +530,14 @@ def merge(
doc = docs_by_id.get(doc_id)
if doc is None:
continue
_, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
pair = _resolve_root_and_source_doc(doc, source_mode=source_mode)
try:
doc_path = (
source_doc.archive_path
pair.source_doc.archive_path
if archive_fallback
and source_doc.mime_type != "application/pdf"
and source_doc.has_archive_version
else source_doc.source_path
and pair.source_doc.mime_type != "application/pdf"
and pair.source_doc.has_archive_version
else pair.source_doc.source_path
)
with pikepdf.open(str(doc_path)) as pdf:
version = max(version, pdf.pdf_version)
@@ -624,16 +627,13 @@ def split(
f"Attempting to split document {doc_ids[0]} into {len(pages)} documents",
)
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
_, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
pair = _resolve_root_and_source_doc(doc, source_mode=source_mode)
import pikepdf
consume_tasks = []
try:
with pikepdf.open(source_doc.source_path) as pdf:
with pikepdf.open(pair.source_doc.source_path) as pdf:
for idx, split_doc in enumerate(pages):
dst: pikepdf.Pdf = pikepdf.new()
for page in split_doc:
@@ -705,10 +705,7 @@ def delete_pages(
f"Attempting to delete pages {pages} from {len(doc_ids)} documents",
)
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
pair = _resolve_root_and_source_doc(doc, source_mode=source_mode)
pages = sorted(pages) # sort pages to avoid index issues
import pikepdf
@@ -716,9 +713,9 @@ def delete_pages(
# Produce edited PDF to a temp file and create a new version
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{root_doc.id}_pages_deleted.pdf"
/ f"{pair.root_doc.id}_pages_deleted.pdf"
)
with pikepdf.open(source_doc.source_path) as pdf:
with pikepdf.open(pair.source_doc.source_path) as pdf:
offset = 1 # pages are 1-indexed
for page_num in pages:
pdf.pages.remove(pdf.pages[page_num - offset])
@@ -726,7 +723,7 @@ def delete_pages(
pdf.remove_unreferenced_resources()
pdf.save(filepath)
overrides = DocumentMetadataOverrides().from_document(root_doc)
overrides = DocumentMetadataOverrides().from_document(pair.root_doc)
if user is not None:
overrides.actor_id = user.id
consume_file.apply_async(
@@ -734,17 +731,17 @@ def delete_pages(
"input_doc": ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
root_document_id=pair.root_doc.id,
),
"overrides": overrides,
},
headers={"trigger_source": trigger_source},
)
logger.info(
f"Queued new version for document {root_doc.id} after deleting pages {pages}",
f"Queued new version for document {pair.root_doc.id} after deleting pages {pages}",
)
except Exception as e:
logger.exception(f"Error deleting pages from document {root_doc.id}: {e}")
logger.exception(f"Error deleting pages from document {pair.root_doc.id}: {e}")
return "OK"
@@ -772,16 +769,13 @@ def edit_pdf(
f"Editing PDF of document {doc_ids[0]} with {len(operations)} operations",
)
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
pair = _resolve_root_and_source_doc(doc, source_mode=source_mode)
import pikepdf
pdf_docs: list[pikepdf.Pdf] = []
try:
with pikepdf.open(source_doc.source_path) as src:
with pikepdf.open(pair.source_doc.source_path) as src:
# prepare output documents
max_idx = max(op.get("doc", 0) for op in operations)
pdf_docs = [pikepdf.new() for _ in range(max_idx + 1)]
@@ -805,11 +799,11 @@ def edit_pdf(
pdf.remove_unreferenced_resources()
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{root_doc.id}_edited.pdf"
/ f"{pair.root_doc.id}_edited.pdf"
)
pdf.save(filepath)
overrides = (
DocumentMetadataOverrides().from_document(root_doc)
DocumentMetadataOverrides().from_document(pair.root_doc)
if include_metadata
else DocumentMetadataOverrides()
)
@@ -821,7 +815,7 @@ def edit_pdf(
"input_doc": ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
root_document_id=pair.root_doc.id,
),
"overrides": overrides,
},
@@ -830,7 +824,7 @@ def edit_pdf(
else:
consume_tasks = []
overrides = (
DocumentMetadataOverrides().from_document(root_doc)
DocumentMetadataOverrides().from_document(pair.root_doc)
if include_metadata
else DocumentMetadataOverrides()
)
@@ -840,11 +834,11 @@ def edit_pdf(
if not delete_original:
overrides.skip_asn_if_exists = True
if delete_original and len(pdf_docs) == 1:
overrides.asn = root_doc.archive_serial_number
overrides.asn = pair.root_doc.archive_serial_number
for idx, pdf in enumerate(pdf_docs, start=1):
version_filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{root_doc.id}_edit_{idx}.pdf"
/ f"{pair.root_doc.id}_edit_{idx}.pdf"
)
pdf.remove_unreferenced_resources()
pdf.save(version_filepath)
@@ -874,7 +868,7 @@ def edit_pdf(
group(consume_tasks).delay()
except Exception as e:
logger.exception(f"Error editing document {root_doc.id}: {e}")
logger.exception(f"Error editing document {pair.root_doc.id}: {e}")
raise ValueError(
f"An error occurred while editing the document: {e}",
) from e
@@ -900,18 +894,15 @@ def remove_password(
for doc_id in doc_ids:
doc = Document.objects.select_related("root_document").get(id=doc_id)
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
pair = _resolve_root_and_source_doc(doc, source_mode=source_mode)
try:
logger.info(
f"Attempting password removal from document {doc_ids[0]}",
)
with pikepdf.open(source_doc.source_path, password=password) as pdf:
with pikepdf.open(pair.source_doc.source_path, password=password) as pdf:
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{root_doc.id}_unprotected.pdf"
/ f"{pair.root_doc.id}_unprotected.pdf"
)
pdf.remove_unreferenced_resources()
pdf.save(filepath)
@@ -919,7 +910,7 @@ def remove_password(
if update_document:
# Create a new version rather than modifying the root/original in place.
overrides = (
DocumentMetadataOverrides().from_document(root_doc)
DocumentMetadataOverrides().from_document(pair.root_doc)
if include_metadata
else DocumentMetadataOverrides()
)
@@ -931,7 +922,7 @@ def remove_password(
"input_doc": ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
root_document_id=pair.root_doc.id,
),
"overrides": overrides,
},
@@ -940,7 +931,7 @@ def remove_password(
else:
consume_tasks = []
overrides = (
DocumentMetadataOverrides().from_document(root_doc)
DocumentMetadataOverrides().from_document(pair.root_doc)
if include_metadata
else DocumentMetadataOverrides()
)
@@ -968,7 +959,7 @@ def remove_password(
except Exception as e:
logger.exception(
f"Error removing password from document {root_doc.id}: {e}",
f"Error removing password from document {pair.root_doc.id}: {e}",
)
raise ValueError(
f"An error occurred while removing the password: {e}",
+73 -29
View File
@@ -15,6 +15,7 @@ from time import mktime
from typing import TYPE_CHECKING
from typing import Any
from typing import Literal
from typing import NamedTuple
from unicodedata import normalize
from urllib.parse import quote
from urllib.parse import urlparse
@@ -176,6 +177,7 @@ from documents.permissions import has_system_status_permission
from documents.permissions import set_permissions_for_object
from documents.plugins.date_parsing import get_date_parser
from documents.schema import generate_object_with_permissions_schema
from documents.search import SearchHit
from documents.serialisers import AcknowledgeTasksViewSerializer
from documents.serialisers import BulkDownloadSerializer
from documents.serialisers import BulkEditObjectsSerializer
@@ -252,6 +254,7 @@ from paperless_mail.serialisers import MailRuleSerializer
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import LogEntry
logger = logging.getLogger("paperless.api")
# Crossover point for intersect_and_order: below this count use a targeted
@@ -262,6 +265,25 @@ logger = logging.getLogger("paperless.api")
_TANTIVY_INTERSECT_THRESHOLD = 5_000
class SearchParams(NamedTuple):
sort_field_name: str | None
sort_reverse: bool
use_tantivy_sort: bool
page_num: int
page_size: int
class SearchResultPage(NamedTuple):
ordered_ids: list[int]
hits: list[SearchHit]
page_offset: int
class ResolvedRequestDocs(NamedTuple):
request_doc: Document
root_doc: Document
class IndexView(TemplateView):
template_name = "index.html"
@@ -1202,7 +1224,7 @@ class DocumentViewSet(
request: Request,
*,
include_deleted: bool = False,
) -> tuple[Document, Document] | HttpResponseForbidden:
) -> ResolvedRequestDocs | HttpResponseForbidden:
manager = Document.global_objects if include_deleted else Document.objects
try:
request_doc = manager.select_related(
@@ -1222,7 +1244,7 @@ class DocumentViewSet(
root_doc,
):
return HttpResponseForbidden("Insufficient permissions")
return request_doc, root_doc
return ResolvedRequestDocs(request_doc=request_doc, root_doc=root_doc)
def file_response(self, pk, request, disposition):
resolved = self._resolve_request_and_root_doc(
@@ -1232,8 +1254,11 @@ class DocumentViewSet(
)
if isinstance(resolved, HttpResponseForbidden):
return resolved
request_doc, root_doc = resolved
file_doc = self._get_effective_file_doc(request_doc, root_doc, request)
file_doc = self._get_effective_file_doc(
resolved.request_doc,
resolved.root_doc,
request,
)
return serve_file(
doc=file_doc,
use_archive=not self.original_requested(request)
@@ -1276,11 +1301,14 @@ class DocumentViewSet(
resolved = self._resolve_request_and_root_doc(pk, request)
if isinstance(resolved, HttpResponseForbidden):
return resolved
request_doc, root_doc = resolved
# Choose the effective document (newest version by default,
# or explicit via ?version=).
doc = self._get_effective_file_doc(request_doc, root_doc, request)
doc = self._get_effective_file_doc(
resolved.request_doc,
resolved.root_doc,
request,
)
document_cached_metadata = get_metadata_cache(doc.pk)
@@ -1485,10 +1513,13 @@ class DocumentViewSet(
resolved = self._resolve_request_and_root_doc(pk, request)
if isinstance(resolved, HttpResponseForbidden):
return resolved
request_doc, root_doc = resolved
try:
file_doc = self._get_effective_file_doc(request_doc, root_doc, request)
file_doc = self._get_effective_file_doc(
resolved.request_doc,
resolved.root_doc,
request,
)
return serve_file(
doc=file_doc,
@@ -1506,10 +1537,13 @@ class DocumentViewSet(
resolved = self._resolve_request_and_root_doc(pk, request)
if isinstance(resolved, HttpResponseForbidden):
return resolved
request_doc, root_doc = resolved
try:
file_doc = self._get_effective_file_doc(request_doc, root_doc, request)
file_doc = self._get_effective_file_doc(
resolved.request_doc,
resolved.root_doc,
request,
)
handle = file_doc.thumbnail_file
return FileResponse(handle, content_type="image/webp")
@@ -2191,7 +2225,7 @@ class UnifiedSearchViewSet(DocumentViewSet):
from documents.search import TantivyRelevanceList
from documents.search import get_backend
def parse_search_params() -> tuple[str | None, bool, bool, int, int]:
def parse_search_params() -> SearchParams:
"""Extract query string, search mode, and ordering from request."""
active = self._get_active_search_params(request)
if len(active) > 1:
@@ -2223,7 +2257,13 @@ class UnifiedSearchViewSet(DocumentViewSet):
self.paginator.get_page_size(request) or self.paginator.page_size
)
return sort_field_name, sort_reverse, use_tantivy_sort, page_num, page_size
return SearchParams(
sort_field_name=sort_field_name,
sort_reverse=sort_reverse,
use_tantivy_sort=use_tantivy_sort,
page_num=page_num,
page_size=page_size,
)
def intersect_and_order(
all_ids: list[int],
@@ -2255,7 +2295,7 @@ class UnifiedSearchViewSet(DocumentViewSet):
backend: TantivyBackend,
user: User | None,
filtered_qs: QuerySet[Document],
) -> tuple[list[int], list[SearchHit], int]:
) -> SearchResultPage:
"""Handle text/title/query search: IDs, ORM intersection, page highlights."""
if "text" in request.query_params:
search_mode = SearchMode.TEXT
@@ -2297,13 +2337,17 @@ class UnifiedSearchViewSet(DocumentViewSet):
search_mode=search_mode,
rank_start=page_offset + 1,
)
return ordered_ids, page_hits, page_offset
return SearchResultPage(
ordered_ids=ordered_ids,
hits=page_hits,
page_offset=page_offset,
)
def run_more_like_this(
backend: TantivyBackend,
user: User | None,
filtered_qs: QuerySet[Document],
) -> tuple[list[int], list[SearchHit], int]:
) -> SearchResultPage:
"""Handle more_like_id search: permission check, IDs, stub hits."""
try:
more_like_doc_id = int(request.query_params["more_like_id"])
@@ -2333,7 +2377,11 @@ class UnifiedSearchViewSet(DocumentViewSet):
SearchHit(id=doc_id, score=0.0, rank=rank, highlights={})
for rank, doc_id in enumerate(page_ids, start=page_offset + 1)
]
return ordered_ids, page_hits, page_offset
return SearchResultPage(
ordered_ids=ordered_ids,
hits=page_hits,
page_offset=page_offset,
)
try:
sort_field_name, sort_reverse, use_tantivy_sort, page_num, page_size = (
@@ -2345,19 +2393,15 @@ class UnifiedSearchViewSet(DocumentViewSet):
user = None if request.user.is_superuser else request.user
if "more_like_id" in request.query_params:
ordered_ids, page_hits, page_offset = run_more_like_this(
backend,
user,
filtered_qs,
)
result = run_more_like_this(backend, user, filtered_qs)
else:
ordered_ids, page_hits, page_offset = run_text_search(
backend,
user,
filtered_qs,
)
result = run_text_search(backend, user, filtered_qs)
rl = TantivyRelevanceList(ordered_ids, page_hits, page_offset)
rl = TantivyRelevanceList(
result.ordered_ids,
result.hits,
result.page_offset,
)
page = self.paginate_queryset(rl)
if page is not None:
@@ -2373,12 +2417,12 @@ class UnifiedSearchViewSet(DocumentViewSet):
# at scale (tens of thousands of matching documents).
response.data["selection_data"] = (
self._get_selection_data_for_queryset(
filtered_qs.filter(pk__in=ordered_ids),
filtered_qs.filter(pk__in=result.ordered_ids),
)
)
return response
serializer = self.get_serializer(page_hits, many=True)
serializer = self.get_serializer(result.hits, many=True)
return Response(serializer.data)
except NotFound: