From 09d1f09734ed93e1129b93cd8848c0d409fa5851 Mon Sep 17 00:00:00 2001 From: Trenton Holmes <797416+stumpylog@users.noreply.github.com> Date: Sat, 2 May 2026 20:00:10 -0700 Subject: [PATCH] use a few named tuples and data classes instead of so much unpacking --- src/documents/bulk_edit.py | 119 +++++++++++++++++-------------------- src/documents/views.py | 102 ++++++++++++++++++++++--------- 2 files changed, 128 insertions(+), 93 deletions(-) diff --git a/src/documents/bulk_edit.py b/src/documents/bulk_edit.py index 65ce3c785..36c6b7041 100644 --- a/src/documents/bulk_edit.py +++ b/src/documents/bulk_edit.py @@ -5,6 +5,7 @@ import tempfile from pathlib import Path from typing import TYPE_CHECKING from typing import Literal +from typing import NamedTuple from celery import chord from celery import group @@ -46,6 +47,11 @@ class SourceModeChoices: EXPLICIT_SELECTION: SourceMode = "explicit_selection" +class ResolvedDocPair(NamedTuple): + root_doc: Document + source_doc: Document + + @shared_task(bind=True) def restore_archive_serial_numbers_task( self, @@ -86,17 +92,20 @@ def _resolve_root_and_source_doc( doc: Document, *, source_mode: SourceMode = SourceModeChoices.LATEST_VERSION, -) -> tuple[Document, Document]: +) -> ResolvedDocPair: root_doc = get_root_document(doc) if source_mode == SourceModeChoices.EXPLICIT_SELECTION: - return root_doc, doc + return ResolvedDocPair(root_doc=root_doc, source_doc=doc) # Version IDs are explicit by default, only a selected root resolves to latest if doc.root_document_id is not None: - return root_doc, doc + return ResolvedDocPair(root_doc=root_doc, source_doc=doc) - return root_doc, get_latest_version_for_root(root_doc) + return ResolvedDocPair( + root_doc=root_doc, + source_doc=get_latest_version_for_root(root_doc), + ) def set_correspondent( @@ -442,39 +451,36 @@ def rotate( id__in=doc_ids, ) } - docs_by_root_id: dict[int, tuple[Document, Document]] = {} + docs_by_root_id: dict[int, ResolvedDocPair] = {} for doc_id in doc_ids: doc = docs_by_id.get(doc_id) if doc is None: continue - root_doc, source_doc = _resolve_root_and_source_doc( - doc, - source_mode=source_mode, - ) - docs_by_root_id.setdefault(root_doc.id, (root_doc, source_doc)) + pair = _resolve_root_and_source_doc(doc, source_mode=source_mode) + docs_by_root_id.setdefault(pair.root_doc.id, pair) import pikepdf - for root_doc, source_doc in docs_by_root_id.values(): - if source_doc.mime_type != "application/pdf": + for pair in docs_by_root_id.values(): + if pair.source_doc.mime_type != "application/pdf": logger.warning( - f"Document {root_doc.id} is not a PDF, skipping rotation.", + f"Document {pair.root_doc.id} is not a PDF, skipping rotation.", ) continue try: # Write rotated output to a temp file and create a new version via consume pipeline filepath: Path = ( Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR)) - / f"{root_doc.id}_rotated.pdf" + / f"{pair.root_doc.id}_rotated.pdf" ) - with pikepdf.open(source_doc.source_path) as pdf: + with pikepdf.open(pair.source_doc.source_path) as pdf: for page in pdf.pages: page.rotate(degrees, relative=True) pdf.remove_unreferenced_resources() pdf.save(filepath) # Preserve metadata/permissions via overrides; mark as new version - overrides = DocumentMetadataOverrides().from_document(root_doc) + overrides = DocumentMetadataOverrides().from_document(pair.root_doc) if user is not None: overrides.actor_id = user.id @@ -483,17 +489,17 @@ def rotate( "input_doc": ConsumableDocument( source=DocumentSource.ConsumeFolder, original_file=filepath, - root_document_id=root_doc.id, + root_document_id=pair.root_doc.id, ), "overrides": overrides, }, headers={"trigger_source": trigger_source}, ) logger.info( - f"Queued new rotated version for document {root_doc.id} by {degrees} degrees", + f"Queued new rotated version for document {pair.root_doc.id} by {degrees} degrees", ) except Exception as e: - logger.exception(f"Error rotating document {root_doc.id}: {e}") + logger.exception(f"Error rotating document {pair.root_doc.id}: {e}") return "OK" @@ -524,17 +530,14 @@ def merge( doc = docs_by_id.get(doc_id) if doc is None: continue - _, source_doc = _resolve_root_and_source_doc( - doc, - source_mode=source_mode, - ) + pair = _resolve_root_and_source_doc(doc, source_mode=source_mode) try: doc_path = ( - source_doc.archive_path + pair.source_doc.archive_path if archive_fallback - and source_doc.mime_type != "application/pdf" - and source_doc.has_archive_version - else source_doc.source_path + and pair.source_doc.mime_type != "application/pdf" + and pair.source_doc.has_archive_version + else pair.source_doc.source_path ) with pikepdf.open(str(doc_path)) as pdf: version = max(version, pdf.pdf_version) @@ -624,16 +627,13 @@ def split( f"Attempting to split document {doc_ids[0]} into {len(pages)} documents", ) doc = Document.objects.select_related("root_document").get(id=doc_ids[0]) - _, source_doc = _resolve_root_and_source_doc( - doc, - source_mode=source_mode, - ) + pair = _resolve_root_and_source_doc(doc, source_mode=source_mode) import pikepdf consume_tasks = [] try: - with pikepdf.open(source_doc.source_path) as pdf: + with pikepdf.open(pair.source_doc.source_path) as pdf: for idx, split_doc in enumerate(pages): dst: pikepdf.Pdf = pikepdf.new() for page in split_doc: @@ -705,10 +705,7 @@ def delete_pages( f"Attempting to delete pages {pages} from {len(doc_ids)} documents", ) doc = Document.objects.select_related("root_document").get(id=doc_ids[0]) - root_doc, source_doc = _resolve_root_and_source_doc( - doc, - source_mode=source_mode, - ) + pair = _resolve_root_and_source_doc(doc, source_mode=source_mode) pages = sorted(pages) # sort pages to avoid index issues import pikepdf @@ -716,9 +713,9 @@ def delete_pages( # Produce edited PDF to a temp file and create a new version filepath: Path = ( Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR)) - / f"{root_doc.id}_pages_deleted.pdf" + / f"{pair.root_doc.id}_pages_deleted.pdf" ) - with pikepdf.open(source_doc.source_path) as pdf: + with pikepdf.open(pair.source_doc.source_path) as pdf: offset = 1 # pages are 1-indexed for page_num in pages: pdf.pages.remove(pdf.pages[page_num - offset]) @@ -726,7 +723,7 @@ def delete_pages( pdf.remove_unreferenced_resources() pdf.save(filepath) - overrides = DocumentMetadataOverrides().from_document(root_doc) + overrides = DocumentMetadataOverrides().from_document(pair.root_doc) if user is not None: overrides.actor_id = user.id consume_file.apply_async( @@ -734,17 +731,17 @@ def delete_pages( "input_doc": ConsumableDocument( source=DocumentSource.ConsumeFolder, original_file=filepath, - root_document_id=root_doc.id, + root_document_id=pair.root_doc.id, ), "overrides": overrides, }, headers={"trigger_source": trigger_source}, ) logger.info( - f"Queued new version for document {root_doc.id} after deleting pages {pages}", + f"Queued new version for document {pair.root_doc.id} after deleting pages {pages}", ) except Exception as e: - logger.exception(f"Error deleting pages from document {root_doc.id}: {e}") + logger.exception(f"Error deleting pages from document {pair.root_doc.id}: {e}") return "OK" @@ -772,16 +769,13 @@ def edit_pdf( f"Editing PDF of document {doc_ids[0]} with {len(operations)} operations", ) doc = Document.objects.select_related("root_document").get(id=doc_ids[0]) - root_doc, source_doc = _resolve_root_and_source_doc( - doc, - source_mode=source_mode, - ) + pair = _resolve_root_and_source_doc(doc, source_mode=source_mode) import pikepdf pdf_docs: list[pikepdf.Pdf] = [] try: - with pikepdf.open(source_doc.source_path) as src: + with pikepdf.open(pair.source_doc.source_path) as src: # prepare output documents max_idx = max(op.get("doc", 0) for op in operations) pdf_docs = [pikepdf.new() for _ in range(max_idx + 1)] @@ -805,11 +799,11 @@ def edit_pdf( pdf.remove_unreferenced_resources() filepath: Path = ( Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR)) - / f"{root_doc.id}_edited.pdf" + / f"{pair.root_doc.id}_edited.pdf" ) pdf.save(filepath) overrides = ( - DocumentMetadataOverrides().from_document(root_doc) + DocumentMetadataOverrides().from_document(pair.root_doc) if include_metadata else DocumentMetadataOverrides() ) @@ -821,7 +815,7 @@ def edit_pdf( "input_doc": ConsumableDocument( source=DocumentSource.ConsumeFolder, original_file=filepath, - root_document_id=root_doc.id, + root_document_id=pair.root_doc.id, ), "overrides": overrides, }, @@ -830,7 +824,7 @@ def edit_pdf( else: consume_tasks = [] overrides = ( - DocumentMetadataOverrides().from_document(root_doc) + DocumentMetadataOverrides().from_document(pair.root_doc) if include_metadata else DocumentMetadataOverrides() ) @@ -840,11 +834,11 @@ def edit_pdf( if not delete_original: overrides.skip_asn_if_exists = True if delete_original and len(pdf_docs) == 1: - overrides.asn = root_doc.archive_serial_number + overrides.asn = pair.root_doc.archive_serial_number for idx, pdf in enumerate(pdf_docs, start=1): version_filepath: Path = ( Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR)) - / f"{root_doc.id}_edit_{idx}.pdf" + / f"{pair.root_doc.id}_edit_{idx}.pdf" ) pdf.remove_unreferenced_resources() pdf.save(version_filepath) @@ -874,7 +868,7 @@ def edit_pdf( group(consume_tasks).delay() except Exception as e: - logger.exception(f"Error editing document {root_doc.id}: {e}") + logger.exception(f"Error editing document {pair.root_doc.id}: {e}") raise ValueError( f"An error occurred while editing the document: {e}", ) from e @@ -900,18 +894,15 @@ def remove_password( for doc_id in doc_ids: doc = Document.objects.select_related("root_document").get(id=doc_id) - root_doc, source_doc = _resolve_root_and_source_doc( - doc, - source_mode=source_mode, - ) + pair = _resolve_root_and_source_doc(doc, source_mode=source_mode) try: logger.info( f"Attempting password removal from document {doc_ids[0]}", ) - with pikepdf.open(source_doc.source_path, password=password) as pdf: + with pikepdf.open(pair.source_doc.source_path, password=password) as pdf: filepath: Path = ( Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR)) - / f"{root_doc.id}_unprotected.pdf" + / f"{pair.root_doc.id}_unprotected.pdf" ) pdf.remove_unreferenced_resources() pdf.save(filepath) @@ -919,7 +910,7 @@ def remove_password( if update_document: # Create a new version rather than modifying the root/original in place. overrides = ( - DocumentMetadataOverrides().from_document(root_doc) + DocumentMetadataOverrides().from_document(pair.root_doc) if include_metadata else DocumentMetadataOverrides() ) @@ -931,7 +922,7 @@ def remove_password( "input_doc": ConsumableDocument( source=DocumentSource.ConsumeFolder, original_file=filepath, - root_document_id=root_doc.id, + root_document_id=pair.root_doc.id, ), "overrides": overrides, }, @@ -940,7 +931,7 @@ def remove_password( else: consume_tasks = [] overrides = ( - DocumentMetadataOverrides().from_document(root_doc) + DocumentMetadataOverrides().from_document(pair.root_doc) if include_metadata else DocumentMetadataOverrides() ) @@ -968,7 +959,7 @@ def remove_password( except Exception as e: logger.exception( - f"Error removing password from document {root_doc.id}: {e}", + f"Error removing password from document {pair.root_doc.id}: {e}", ) raise ValueError( f"An error occurred while removing the password: {e}", diff --git a/src/documents/views.py b/src/documents/views.py index eb0f8b8f0..e442b812d 100644 --- a/src/documents/views.py +++ b/src/documents/views.py @@ -15,6 +15,7 @@ from time import mktime from typing import TYPE_CHECKING from typing import Any from typing import Literal +from typing import NamedTuple from unicodedata import normalize from urllib.parse import quote from urllib.parse import urlparse @@ -176,6 +177,7 @@ from documents.permissions import has_system_status_permission from documents.permissions import set_permissions_for_object from documents.plugins.date_parsing import get_date_parser from documents.schema import generate_object_with_permissions_schema +from documents.search import SearchHit from documents.serialisers import AcknowledgeTasksViewSerializer from documents.serialisers import BulkDownloadSerializer from documents.serialisers import BulkEditObjectsSerializer @@ -252,6 +254,7 @@ from paperless_mail.serialisers import MailRuleSerializer if settings.AUDIT_LOG_ENABLED: from auditlog.models import LogEntry + logger = logging.getLogger("paperless.api") # Crossover point for intersect_and_order: below this count use a targeted @@ -262,6 +265,25 @@ logger = logging.getLogger("paperless.api") _TANTIVY_INTERSECT_THRESHOLD = 5_000 +class SearchParams(NamedTuple): + sort_field_name: str | None + sort_reverse: bool + use_tantivy_sort: bool + page_num: int + page_size: int + + +class SearchResultPage(NamedTuple): + ordered_ids: list[int] + hits: list[SearchHit] + page_offset: int + + +class ResolvedRequestDocs(NamedTuple): + request_doc: Document + root_doc: Document + + class IndexView(TemplateView): template_name = "index.html" @@ -1202,7 +1224,7 @@ class DocumentViewSet( request: Request, *, include_deleted: bool = False, - ) -> tuple[Document, Document] | HttpResponseForbidden: + ) -> ResolvedRequestDocs | HttpResponseForbidden: manager = Document.global_objects if include_deleted else Document.objects try: request_doc = manager.select_related( @@ -1222,7 +1244,7 @@ class DocumentViewSet( root_doc, ): return HttpResponseForbidden("Insufficient permissions") - return request_doc, root_doc + return ResolvedRequestDocs(request_doc=request_doc, root_doc=root_doc) def file_response(self, pk, request, disposition): resolved = self._resolve_request_and_root_doc( @@ -1232,8 +1254,11 @@ class DocumentViewSet( ) if isinstance(resolved, HttpResponseForbidden): return resolved - request_doc, root_doc = resolved - file_doc = self._get_effective_file_doc(request_doc, root_doc, request) + file_doc = self._get_effective_file_doc( + resolved.request_doc, + resolved.root_doc, + request, + ) return serve_file( doc=file_doc, use_archive=not self.original_requested(request) @@ -1276,11 +1301,14 @@ class DocumentViewSet( resolved = self._resolve_request_and_root_doc(pk, request) if isinstance(resolved, HttpResponseForbidden): return resolved - request_doc, root_doc = resolved # Choose the effective document (newest version by default, # or explicit via ?version=). - doc = self._get_effective_file_doc(request_doc, root_doc, request) + doc = self._get_effective_file_doc( + resolved.request_doc, + resolved.root_doc, + request, + ) document_cached_metadata = get_metadata_cache(doc.pk) @@ -1485,10 +1513,13 @@ class DocumentViewSet( resolved = self._resolve_request_and_root_doc(pk, request) if isinstance(resolved, HttpResponseForbidden): return resolved - request_doc, root_doc = resolved try: - file_doc = self._get_effective_file_doc(request_doc, root_doc, request) + file_doc = self._get_effective_file_doc( + resolved.request_doc, + resolved.root_doc, + request, + ) return serve_file( doc=file_doc, @@ -1506,10 +1537,13 @@ class DocumentViewSet( resolved = self._resolve_request_and_root_doc(pk, request) if isinstance(resolved, HttpResponseForbidden): return resolved - request_doc, root_doc = resolved try: - file_doc = self._get_effective_file_doc(request_doc, root_doc, request) + file_doc = self._get_effective_file_doc( + resolved.request_doc, + resolved.root_doc, + request, + ) handle = file_doc.thumbnail_file return FileResponse(handle, content_type="image/webp") @@ -2191,7 +2225,7 @@ class UnifiedSearchViewSet(DocumentViewSet): from documents.search import TantivyRelevanceList from documents.search import get_backend - def parse_search_params() -> tuple[str | None, bool, bool, int, int]: + def parse_search_params() -> SearchParams: """Extract query string, search mode, and ordering from request.""" active = self._get_active_search_params(request) if len(active) > 1: @@ -2223,7 +2257,13 @@ class UnifiedSearchViewSet(DocumentViewSet): self.paginator.get_page_size(request) or self.paginator.page_size ) - return sort_field_name, sort_reverse, use_tantivy_sort, page_num, page_size + return SearchParams( + sort_field_name=sort_field_name, + sort_reverse=sort_reverse, + use_tantivy_sort=use_tantivy_sort, + page_num=page_num, + page_size=page_size, + ) def intersect_and_order( all_ids: list[int], @@ -2255,7 +2295,7 @@ class UnifiedSearchViewSet(DocumentViewSet): backend: TantivyBackend, user: User | None, filtered_qs: QuerySet[Document], - ) -> tuple[list[int], list[SearchHit], int]: + ) -> SearchResultPage: """Handle text/title/query search: IDs, ORM intersection, page highlights.""" if "text" in request.query_params: search_mode = SearchMode.TEXT @@ -2297,13 +2337,17 @@ class UnifiedSearchViewSet(DocumentViewSet): search_mode=search_mode, rank_start=page_offset + 1, ) - return ordered_ids, page_hits, page_offset + return SearchResultPage( + ordered_ids=ordered_ids, + hits=page_hits, + page_offset=page_offset, + ) def run_more_like_this( backend: TantivyBackend, user: User | None, filtered_qs: QuerySet[Document], - ) -> tuple[list[int], list[SearchHit], int]: + ) -> SearchResultPage: """Handle more_like_id search: permission check, IDs, stub hits.""" try: more_like_doc_id = int(request.query_params["more_like_id"]) @@ -2333,7 +2377,11 @@ class UnifiedSearchViewSet(DocumentViewSet): SearchHit(id=doc_id, score=0.0, rank=rank, highlights={}) for rank, doc_id in enumerate(page_ids, start=page_offset + 1) ] - return ordered_ids, page_hits, page_offset + return SearchResultPage( + ordered_ids=ordered_ids, + hits=page_hits, + page_offset=page_offset, + ) try: sort_field_name, sort_reverse, use_tantivy_sort, page_num, page_size = ( @@ -2345,19 +2393,15 @@ class UnifiedSearchViewSet(DocumentViewSet): user = None if request.user.is_superuser else request.user if "more_like_id" in request.query_params: - ordered_ids, page_hits, page_offset = run_more_like_this( - backend, - user, - filtered_qs, - ) + result = run_more_like_this(backend, user, filtered_qs) else: - ordered_ids, page_hits, page_offset = run_text_search( - backend, - user, - filtered_qs, - ) + result = run_text_search(backend, user, filtered_qs) - rl = TantivyRelevanceList(ordered_ids, page_hits, page_offset) + rl = TantivyRelevanceList( + result.ordered_ids, + result.hits, + result.page_offset, + ) page = self.paginate_queryset(rl) if page is not None: @@ -2373,12 +2417,12 @@ class UnifiedSearchViewSet(DocumentViewSet): # at scale (tens of thousands of matching documents). response.data["selection_data"] = ( self._get_selection_data_for_queryset( - filtered_qs.filter(pk__in=ordered_ids), + filtered_qs.filter(pk__in=result.ordered_ids), ) ) return response - serializer = self.get_serializer(page_hits, many=True) + serializer = self.get_serializer(result.hits, many=True) return Response(serializer.data) except NotFound: