From a351dfa25c6463ef19055e70ac32715fa6bee3ee Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:23:23 -0700 Subject: [PATCH] feat: consumer creates DocumentVersion on consume and version upload - Rewrite _create_version_from_root to create DocumentVersion instead of a root_document Document row; uses MAX(version_number)+1 with SELECT FOR UPDATE on non-SQLite to prevent races - Split FileLock block: version uploads write files to DocumentVersion, then sync Document cache fields (filename, checksum, content, etc.) - Non-version path creates DocumentVersion(version_number=1) after document.save() so filenames are populated - Remove version_label from apply_overrides (now applied at DocumentVersion creation time) - Add get_effective_content() shim to Document (Task 9 will remove callers) - Remove stale root_document_id references from signals/handlers.py Co-Authored-By: Claude Sonnet 4.6 --- src/documents/consumer.py | 291 ++++++++++++++++++--------- src/documents/models.py | 9 + src/documents/signals/handlers.py | 17 -- src/documents/tests/test_consumer.py | 61 +++--- 4 files changed, 240 insertions(+), 138 deletions(-) diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 6656a3d96..bca248a45 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -11,6 +11,7 @@ from typing import Final import magic from django.conf import settings from django.contrib.auth.models import User +from django.db import connection from django.db import transaction from django.db.models import Max from django.db.models import Q @@ -30,6 +31,7 @@ from documents.models import CustomField from documents.models import CustomFieldInstance from documents.models import Document from documents.models import DocumentType +from documents.models import DocumentVersion from documents.models import StoragePath from documents.models import Tag from documents.models import WorkflowTrigger @@ -250,39 +252,41 @@ class ConsumerPlugin( text: str | None, page_count: int | None, mime_type: str, - ) -> Document: - self.log.debug("Saving record for updated version to database") - root_doc_frozen = Document.objects.select_for_update().get(pk=root_doc.pk) - next_version_index = ( - Document.global_objects.filter( - root_document_id=root_doc_frozen.pk, - ).aggregate( - max_index=Max("version_index"), - )["max_index"] + ) -> DocumentVersion: + self.log.debug("Saving record for new version to database") + # SQLite uses BEGIN EXCLUSIVE on write inside transaction.atomic(), which gives + # serializable isolation — SELECT FOR UPDATE is both unnecessary and unsupported. + # PostgreSQL and MariaDB need the explicit row lock to prevent concurrent version + # number races; the lock is held for the duration of the outer transaction. + if connection.vendor != "sqlite": + DocumentVersion.objects.select_for_update().filter( + document=root_doc, + ).exists() + next_number = ( + DocumentVersion.objects.filter(document=root_doc).aggregate( + max_num=Max("version_number"), + )["max_num"] or 0 - ) + ) + 1 + file_for_checksum = ( self.unmodified_original if self.unmodified_original is not None else self.working_copy ) - version_doc = Document( - root_document=root_doc_frozen, - version_index=next_version_index + 1, + new_version = DocumentVersion( + document=root_doc, + version_number=next_number, checksum=compute_checksum(file_for_checksum), content=text or "", page_count=page_count, mime_type=mime_type, original_filename=self.filename, - owner_id=root_doc_frozen.owner_id, - created=root_doc_frozen.created, - title=root_doc_frozen.title, added=timezone.now(), - modified=timezone.now(), ) if self.metadata.version_label is not None: - version_doc.version_label = self.metadata.version_label - return version_doc + new_version.version_label = self.metadata.version_label + return new_version def run_pre_consume_script(self) -> None: """ @@ -586,21 +590,17 @@ class ConsumerPlugin( with transaction.atomic(): # store the document. if self.input_doc.root_document_id: - # If this is a new version of an existing document, we need - # to make sure we're not creating a new document, but updating - # the existing one. root_doc = Document.objects.get( pk=self.input_doc.root_document_id, ) - original_document = self._create_version_from_root( + new_version = self._create_version_from_root( root_doc, text=text, page_count=page_count, mime_type=mime_type, ) - actor = None - # Save the new version, potentially creating an audit log entry for the version addition if enabled. + actor = None if ( settings.AUDIT_LOG_ENABLED and self.metadata.actor_id is not None @@ -608,37 +608,33 @@ class ConsumerPlugin( actor = User.objects.filter( pk=self.metadata.actor_id, ).first() - if actor is not None: - from auditlog.context import ( # type: ignore[import-untyped] - set_actor, - ) - with set_actor(actor): - original_document.save() - else: - original_document.save() + if actor is not None: + from auditlog.context import ( + set_actor, # type: ignore[import-untyped] + ) + + with set_actor(actor): + new_version.save() else: - original_document.save() + new_version.save() - # Create a log entry for the version addition, if enabled if settings.AUDIT_LOG_ENABLED: - from auditlog.models import ( # type: ignore[import-untyped] - LogEntry, + from auditlog.models import ( + LogEntry, # type: ignore[import-untyped] ) LogEntry.objects.log_create( instance=root_doc, - changes={ - "Version Added": ["None", original_document.id], - }, + changes={"Version Added": ["None", new_version.pk]}, action=LogEntry.Action.UPDATE, actor=actor, additional_data={ "reason": "Version added", - "version_id": original_document.id, + "version_id": new_version.pk, }, ) - document = original_document + document = root_doc else: document = self._store( text=text, @@ -666,71 +662,179 @@ class ConsumerPlugin( # After everything is in the database, copy the files into # place. If this fails, we'll also rollback the transaction. - with FileLock(settings.MEDIA_LOCK): - generated_filename = generate_unique_filename(document) - if ( - len(str(generated_filename)) - > Document.MAX_STORED_FILENAME_LENGTH - ): - self.log.warning( - "Generated source filename exceeds db path limit, falling back to default naming", - ) - generated_filename = generate_filename( - document, - use_format=False, - ) - document.filename = generated_filename - create_source_path_directory(document.source_path) - - self._write( - self.unmodified_original - if self.unmodified_original is not None - else self.working_copy, - document.source_path, - ) - - self._write( - thumbnail, - document.thumbnail_path, - ) - - if archive_path and Path(archive_path).is_file(): - generated_archive_filename = generate_unique_filename( - document, - archive_filename=True, + if self.input_doc.root_document_id: + with FileLock(settings.MEDIA_LOCK): + generated_filename = generate_unique_filename( + root_doc, + new_version, ) if ( - len(str(generated_archive_filename)) + len(str(generated_filename)) + > DocumentVersion.MAX_STORED_FILENAME_LENGTH + ): + self.log.warning( + "Generated source filename exceeds db path limit, falling back to default naming", + ) + generated_filename = generate_filename( + root_doc, + new_version, + use_format=False, + ) + new_version.filename = generated_filename + create_source_path_directory(new_version.source_path) + self._write( + self.unmodified_original + if self.unmodified_original is not None + else self.working_copy, + new_version.source_path, + ) + self._write(thumbnail, new_version.thumbnail_path) + + if archive_path and Path(archive_path).is_file(): + generated_archive_filename = ( + generate_unique_filename( + root_doc, + new_version, + archive_filename=True, + ) + ) + if ( + len(str(generated_archive_filename)) + > DocumentVersion.MAX_STORED_FILENAME_LENGTH + ): + generated_archive_filename = generate_filename( + root_doc, + new_version, + archive_filename=True, + use_format=False, + ) + new_version.archive_filename = ( + generated_archive_filename + ) + create_source_path_directory( + new_version.archive_path, + ) + self._write(archive_path, new_version.archive_path) + new_version.archive_checksum = compute_checksum( + new_version.archive_path, + ) + + new_version.save( + update_fields=[ + "filename", + "archive_filename", + "archive_checksum", + ], + ) + + # Sync all Document cache fields from the new version so search/matching + # and file-serving remain correct without any subquery. + root_doc.content = new_version.content + root_doc.checksum = new_version.checksum + root_doc.archive_checksum = new_version.archive_checksum + root_doc.filename = new_version.filename + root_doc.archive_filename = new_version.archive_filename + root_doc.mime_type = new_version.mime_type + root_doc.page_count = new_version.page_count + root_doc.original_filename = new_version.original_filename + root_doc.modified = timezone.now() + root_doc.save( + update_fields=[ + "content", + "checksum", + "archive_checksum", + "filename", + "archive_filename", + "mime_type", + "page_count", + "original_filename", + "modified", + ], + ) + + document_updated.send( + sender=self.__class__, + document=root_doc, + ) + else: + with FileLock(settings.MEDIA_LOCK): + generated_filename = generate_unique_filename(document) + if ( + len(str(generated_filename)) > Document.MAX_STORED_FILENAME_LENGTH ): self.log.warning( - "Generated archive filename exceeds db path limit, falling back to default naming", + "Generated source filename exceeds db path limit, falling back to default naming", ) - generated_archive_filename = generate_filename( + generated_filename = generate_filename( document, - archive_filename=True, use_format=False, ) - document.archive_filename = generated_archive_filename - create_source_path_directory(document.archive_path) + document.filename = generated_filename + create_source_path_directory(document.source_path) + self._write( - archive_path, - document.archive_path, + self.unmodified_original + if self.unmodified_original is not None + else self.working_copy, + document.source_path, ) - document.archive_checksum = compute_checksum( - document.archive_path, + self._write( + thumbnail, + document.thumbnail_path, ) - # Don't save with the lock active. Saving will cause the file - # renaming logic to acquire the lock as well. - # This triggers things like file renaming - document.save() + if archive_path and Path(archive_path).is_file(): + generated_archive_filename = ( + generate_unique_filename( + document, + archive_filename=True, + ) + ) + if ( + len(str(generated_archive_filename)) + > Document.MAX_STORED_FILENAME_LENGTH + ): + self.log.warning( + "Generated archive filename exceeds db path limit, falling back to default naming", + ) + generated_archive_filename = generate_filename( + document, + archive_filename=True, + use_format=False, + ) + document.archive_filename = ( + generated_archive_filename + ) + create_source_path_directory(document.archive_path) + self._write( + archive_path, + document.archive_path, + ) - if document.root_document_id: - document_updated.send( - sender=self.__class__, - document=document.root_document, + document.archive_checksum = compute_checksum( + document.archive_path, + ) + + # Don't save with the lock active. Saving will cause the file + # renaming logic to acquire the lock as well. + # This triggers things like file renaming + document.save() + + DocumentVersion.objects.create( + document=document, + version_number=1, + checksum=document.checksum, + archive_checksum=document.archive_checksum, + content=document.content, + page_count=document.page_count, + mime_type=document.mime_type, + original_filename=document.original_filename, + filename=document.filename, + archive_filename=document.archive_filename, + added=document.added, + version_label=self.metadata.version_label, ) # Delete the file only if it was successfully consumed @@ -896,9 +1000,6 @@ class ConsumerPlugin( if self.metadata.asn is not None: document.archive_serial_number = self.metadata.asn - if self.metadata.version_label is not None: - document.version_label = self.metadata.version_label - if self.metadata.owner_id: document.owner = User.objects.get( pk=self.metadata.owner_id, diff --git a/src/documents/models.py b/src/documents/models.py index bee0d58ce..325bca552 100644 --- a/src/documents/models.py +++ b/src/documents/models.py @@ -515,6 +515,15 @@ class Document(DocumentBase, SoftDeleteModel, ModelWithOwner): # type: ignore[d def created_date(self): return self.created + def get_effective_content(self) -> str: + """Return the content to use for search indexing and matching. + + This is a compatibility shim; since DocumentVersion now holds per-version + content, the Document.content cache field already reflects the latest version. + Task 9 will remove the callers of this method. + """ + return self.content + def add_nested_tags(self, tags) -> None: tag_ids = set() for tag in tags: diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index a72abc2d5..2ad69c553 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -654,16 +654,6 @@ def update_filename_and_move_files( root=settings.ARCHIVE_DIR, ) - # Keep version files in sync with root - if instance.root_document_id is None: - for version_doc in Document.objects.filter(root_document_id=instance.pk).only( - "pk", - ): - update_filename_and_move_files( - Document, - version_doc, - ) - @shared_task def process_cf_select_update(custom_field: CustomField) -> None: @@ -870,13 +860,6 @@ def run_workflows( use_overrides = overrides is not None - if isinstance(document, Document) and document.root_document_id is not None: - logger.debug( - "Skipping workflow execution for version document %s", - document.pk, - ) - return None - if original_file is None: original_file = ( document.source_path if not use_overrides else document.original_file diff --git a/src/documents/tests/test_consumer.py b/src/documents/tests/test_consumer.py index 0ff415a5f..e7c8eb710 100644 --- a/src/documents/tests/test_consumer.py +++ b/src/documents/tests/test_consumer.py @@ -25,6 +25,7 @@ from documents.models import Correspondent from documents.models import CustomField from documents.models import Document from documents.models import DocumentType +from documents.models import DocumentVersion from documents.models import StoragePath from documents.models import Tag from documents.parsers import ParseError @@ -725,7 +726,8 @@ class TestConsumer( document = Document.objects.first() assert document is not None - self.assertEqual(document.version_label, "v1") + version = DocumentVersion.objects.get(document=document, version_number=1) + self.assertEqual(version.version_label, "v1") self._assert_first_last_send_progress() @@ -790,16 +792,17 @@ class TestConsumer( finally: consumer.cleanup() - versions = Document.objects.filter(root_document=root_doc) - self.assertEqual(versions.count(), 1) - version = versions.first() - assert version is not None - assert version.original_filename is not None - self.assertEqual(version.version_index, 1) - self.assertEqual(version.version_label, "v2") - self.assertIsNone(version.archive_serial_number) - self.assertEqual(version.original_filename, version_file.name) - self.assertTrue(bool(version.content)) + # Initial consume already created version_number=1. + # Version upload created version_number=2. + versions = DocumentVersion.objects.filter(document=root_doc).order_by( + "version_number", + ) + self.assertEqual(versions.count(), 2) + uploaded = versions.get(version_number=2) + assert uploaded.original_filename is not None + self.assertEqual(uploaded.version_label, "v2") + self.assertEqual(uploaded.original_filename, version_file.name) + self.assertTrue(bool(uploaded.content)) @override_settings(AUDIT_LOG_ENABLED=True) @mock.patch("documents.consumer.load_classifier") @@ -852,14 +855,16 @@ class TestConsumer( finally: consumer.cleanup() - version = ( - Document.objects.filter(root_document=root_doc).order_by("-id").first() + # Initial consume already created version_number=1. + # Version upload created version_number=2. + versions = DocumentVersion.objects.filter(document=root_doc).order_by( + "version_number", ) - self.assertIsNotNone(version) - assert version is not None - self.assertEqual(version.version_index, 1) - self.assertEqual(version.original_filename, "valid_pdf_version-upload") - self.assertTrue(bool(version.content)) + uploaded = versions.get(version_number=2) + self.assertIsNotNone(uploaded) + assert uploaded is not None + self.assertEqual(uploaded.original_filename, "valid_pdf_version-upload") + self.assertTrue(bool(uploaded.content)) @override_settings(AUDIT_LOG_ENABLED=True) @mock.patch("documents.consumer.load_classifier") @@ -873,7 +878,7 @@ class TestConsumer( self.assertIsNotNone(root_doc) assert root_doc is not None - def consume_version(version_file: Path) -> Document: + def consume_version(version_file: Path) -> DocumentVersion: status = DummyProgressManager(version_file.name, None) overrides = DocumentMetadataOverrides() doc = ConsumableDocument( @@ -905,18 +910,22 @@ class TestConsumer( consumer.cleanup() version = ( - Document.objects.filter(root_document=root_doc).order_by("-id").first() + DocumentVersion.objects.filter(document=root_doc) + .order_by("-version_number") + .first() ) assert version is not None return version - v1 = consume_version(self.get_test_file2()) - self.assertEqual(v1.version_index, 1) - v1.delete() + # First upload: version_number=2 (version 1 was created at initial consume) + v1_dv = consume_version(self.get_test_file2()) + self.assertEqual(v1_dv.version_number, 2) + v1_dv.delete() - # The next version should have version_index 2, even though version_index 1 was deleted - v2 = consume_version(self.get_test_file()) - self.assertEqual(v2.version_index, 2) + # After deleting version_number=2, MAX is 1 (the initial consume version). + # The next upload gets MAX+1 = 2. + v2_dv = consume_version(self.get_test_file()) + self.assertEqual(v2_dv.version_number, 2) @mock.patch("documents.consumer.load_classifier") def testClassifyDocument(self, m) -> None: