feat: consumer creates DocumentVersion on consume and version upload

- Rewrite _create_version_from_root to create DocumentVersion instead of
  a root_document Document row; uses MAX(version_number)+1 with
  SELECT FOR UPDATE on non-SQLite to prevent races
- Split FileLock block: version uploads write files to DocumentVersion,
  then sync Document cache fields (filename, checksum, content, etc.)
- Non-version path creates DocumentVersion(version_number=1) after
  document.save() so filenames are populated
- Remove version_label from apply_overrides (now applied at
  DocumentVersion creation time)
- Add get_effective_content() shim to Document (Task 9 will remove callers)
- Remove stale root_document_id references from signals/handlers.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Trenton H
2026-04-13 15:23:23 -07:00
parent 54f44c6c05
commit a351dfa25c
4 changed files with 240 additions and 138 deletions
+196 -95
View File
@@ -11,6 +11,7 @@ from typing import Final
import magic
from django.conf import settings
from django.contrib.auth.models import User
from django.db import connection
from django.db import transaction
from django.db.models import Max
from django.db.models import Q
@@ -30,6 +31,7 @@ from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import DocumentVersion
from documents.models import StoragePath
from documents.models import Tag
from documents.models import WorkflowTrigger
@@ -250,39 +252,41 @@ class ConsumerPlugin(
text: str | None,
page_count: int | None,
mime_type: str,
) -> Document:
self.log.debug("Saving record for updated version to database")
root_doc_frozen = Document.objects.select_for_update().get(pk=root_doc.pk)
next_version_index = (
Document.global_objects.filter(
root_document_id=root_doc_frozen.pk,
).aggregate(
max_index=Max("version_index"),
)["max_index"]
) -> DocumentVersion:
self.log.debug("Saving record for new version to database")
# SQLite uses BEGIN EXCLUSIVE on write inside transaction.atomic(), which gives
# serializable isolation — SELECT FOR UPDATE is both unnecessary and unsupported.
# PostgreSQL and MariaDB need the explicit row lock to prevent concurrent version
# number races; the lock is held for the duration of the outer transaction.
if connection.vendor != "sqlite":
DocumentVersion.objects.select_for_update().filter(
document=root_doc,
).exists()
next_number = (
DocumentVersion.objects.filter(document=root_doc).aggregate(
max_num=Max("version_number"),
)["max_num"]
or 0
)
) + 1
file_for_checksum = (
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy
)
version_doc = Document(
root_document=root_doc_frozen,
version_index=next_version_index + 1,
new_version = DocumentVersion(
document=root_doc,
version_number=next_number,
checksum=compute_checksum(file_for_checksum),
content=text or "",
page_count=page_count,
mime_type=mime_type,
original_filename=self.filename,
owner_id=root_doc_frozen.owner_id,
created=root_doc_frozen.created,
title=root_doc_frozen.title,
added=timezone.now(),
modified=timezone.now(),
)
if self.metadata.version_label is not None:
version_doc.version_label = self.metadata.version_label
return version_doc
new_version.version_label = self.metadata.version_label
return new_version
def run_pre_consume_script(self) -> None:
"""
@@ -586,21 +590,17 @@ class ConsumerPlugin(
with transaction.atomic():
# store the document.
if self.input_doc.root_document_id:
# If this is a new version of an existing document, we need
# to make sure we're not creating a new document, but updating
# the existing one.
root_doc = Document.objects.get(
pk=self.input_doc.root_document_id,
)
original_document = self._create_version_from_root(
new_version = self._create_version_from_root(
root_doc,
text=text,
page_count=page_count,
mime_type=mime_type,
)
actor = None
# Save the new version, potentially creating an audit log entry for the version addition if enabled.
actor = None
if (
settings.AUDIT_LOG_ENABLED
and self.metadata.actor_id is not None
@@ -608,37 +608,33 @@ class ConsumerPlugin(
actor = User.objects.filter(
pk=self.metadata.actor_id,
).first()
if actor is not None:
from auditlog.context import ( # type: ignore[import-untyped]
set_actor,
)
with set_actor(actor):
original_document.save()
else:
original_document.save()
if actor is not None:
from auditlog.context import (
set_actor, # type: ignore[import-untyped]
)
with set_actor(actor):
new_version.save()
else:
original_document.save()
new_version.save()
# Create a log entry for the version addition, if enabled
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import ( # type: ignore[import-untyped]
LogEntry,
from auditlog.models import (
LogEntry, # type: ignore[import-untyped]
)
LogEntry.objects.log_create(
instance=root_doc,
changes={
"Version Added": ["None", original_document.id],
},
changes={"Version Added": ["None", new_version.pk]},
action=LogEntry.Action.UPDATE,
actor=actor,
additional_data={
"reason": "Version added",
"version_id": original_document.id,
"version_id": new_version.pk,
},
)
document = original_document
document = root_doc
else:
document = self._store(
text=text,
@@ -666,71 +662,179 @@ class ConsumerPlugin(
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(document)
if (
len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_filename = generate_filename(
document,
use_format=False,
)
document.filename = generated_filename
create_source_path_directory(document.source_path)
self._write(
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy,
document.source_path,
)
self._write(
thumbnail,
document.thumbnail_path,
)
if archive_path and Path(archive_path).is_file():
generated_archive_filename = generate_unique_filename(
document,
archive_filename=True,
if self.input_doc.root_document_id:
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(
root_doc,
new_version,
)
if (
len(str(generated_archive_filename))
len(str(generated_filename))
> DocumentVersion.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_filename = generate_filename(
root_doc,
new_version,
use_format=False,
)
new_version.filename = generated_filename
create_source_path_directory(new_version.source_path)
self._write(
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy,
new_version.source_path,
)
self._write(thumbnail, new_version.thumbnail_path)
if archive_path and Path(archive_path).is_file():
generated_archive_filename = (
generate_unique_filename(
root_doc,
new_version,
archive_filename=True,
)
)
if (
len(str(generated_archive_filename))
> DocumentVersion.MAX_STORED_FILENAME_LENGTH
):
generated_archive_filename = generate_filename(
root_doc,
new_version,
archive_filename=True,
use_format=False,
)
new_version.archive_filename = (
generated_archive_filename
)
create_source_path_directory(
new_version.archive_path,
)
self._write(archive_path, new_version.archive_path)
new_version.archive_checksum = compute_checksum(
new_version.archive_path,
)
new_version.save(
update_fields=[
"filename",
"archive_filename",
"archive_checksum",
],
)
# Sync all Document cache fields from the new version so search/matching
# and file-serving remain correct without any subquery.
root_doc.content = new_version.content
root_doc.checksum = new_version.checksum
root_doc.archive_checksum = new_version.archive_checksum
root_doc.filename = new_version.filename
root_doc.archive_filename = new_version.archive_filename
root_doc.mime_type = new_version.mime_type
root_doc.page_count = new_version.page_count
root_doc.original_filename = new_version.original_filename
root_doc.modified = timezone.now()
root_doc.save(
update_fields=[
"content",
"checksum",
"archive_checksum",
"filename",
"archive_filename",
"mime_type",
"page_count",
"original_filename",
"modified",
],
)
document_updated.send(
sender=self.__class__,
document=root_doc,
)
else:
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(document)
if (
len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming",
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_archive_filename = generate_filename(
generated_filename = generate_filename(
document,
archive_filename=True,
use_format=False,
)
document.archive_filename = generated_archive_filename
create_source_path_directory(document.archive_path)
document.filename = generated_filename
create_source_path_directory(document.source_path)
self._write(
archive_path,
document.archive_path,
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy,
document.source_path,
)
document.archive_checksum = compute_checksum(
document.archive_path,
self._write(
thumbnail,
document.thumbnail_path,
)
# Don't save with the lock active. Saving will cause the file
# renaming logic to acquire the lock as well.
# This triggers things like file renaming
document.save()
if archive_path and Path(archive_path).is_file():
generated_archive_filename = (
generate_unique_filename(
document,
archive_filename=True,
)
)
if (
len(str(generated_archive_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming",
)
generated_archive_filename = generate_filename(
document,
archive_filename=True,
use_format=False,
)
document.archive_filename = (
generated_archive_filename
)
create_source_path_directory(document.archive_path)
self._write(
archive_path,
document.archive_path,
)
if document.root_document_id:
document_updated.send(
sender=self.__class__,
document=document.root_document,
document.archive_checksum = compute_checksum(
document.archive_path,
)
# Don't save with the lock active. Saving will cause the file
# renaming logic to acquire the lock as well.
# This triggers things like file renaming
document.save()
DocumentVersion.objects.create(
document=document,
version_number=1,
checksum=document.checksum,
archive_checksum=document.archive_checksum,
content=document.content,
page_count=document.page_count,
mime_type=document.mime_type,
original_filename=document.original_filename,
filename=document.filename,
archive_filename=document.archive_filename,
added=document.added,
version_label=self.metadata.version_label,
)
# Delete the file only if it was successfully consumed
@@ -896,9 +1000,6 @@ class ConsumerPlugin(
if self.metadata.asn is not None:
document.archive_serial_number = self.metadata.asn
if self.metadata.version_label is not None:
document.version_label = self.metadata.version_label
if self.metadata.owner_id:
document.owner = User.objects.get(
pk=self.metadata.owner_id,
+9
View File
@@ -515,6 +515,15 @@ class Document(DocumentBase, SoftDeleteModel, ModelWithOwner): # type: ignore[d
def created_date(self):
return self.created
def get_effective_content(self) -> str:
"""Return the content to use for search indexing and matching.
This is a compatibility shim; since DocumentVersion now holds per-version
content, the Document.content cache field already reflects the latest version.
Task 9 will remove the callers of this method.
"""
return self.content
def add_nested_tags(self, tags) -> None:
tag_ids = set()
for tag in tags:
-17
View File
@@ -654,16 +654,6 @@ def update_filename_and_move_files(
root=settings.ARCHIVE_DIR,
)
# Keep version files in sync with root
if instance.root_document_id is None:
for version_doc in Document.objects.filter(root_document_id=instance.pk).only(
"pk",
):
update_filename_and_move_files(
Document,
version_doc,
)
@shared_task
def process_cf_select_update(custom_field: CustomField) -> None:
@@ -870,13 +860,6 @@ def run_workflows(
use_overrides = overrides is not None
if isinstance(document, Document) and document.root_document_id is not None:
logger.debug(
"Skipping workflow execution for version document %s",
document.pk,
)
return None
if original_file is None:
original_file = (
document.source_path if not use_overrides else document.original_file
+35 -26
View File
@@ -25,6 +25,7 @@ from documents.models import Correspondent
from documents.models import CustomField
from documents.models import Document
from documents.models import DocumentType
from documents.models import DocumentVersion
from documents.models import StoragePath
from documents.models import Tag
from documents.parsers import ParseError
@@ -725,7 +726,8 @@ class TestConsumer(
document = Document.objects.first()
assert document is not None
self.assertEqual(document.version_label, "v1")
version = DocumentVersion.objects.get(document=document, version_number=1)
self.assertEqual(version.version_label, "v1")
self._assert_first_last_send_progress()
@@ -790,16 +792,17 @@ class TestConsumer(
finally:
consumer.cleanup()
versions = Document.objects.filter(root_document=root_doc)
self.assertEqual(versions.count(), 1)
version = versions.first()
assert version is not None
assert version.original_filename is not None
self.assertEqual(version.version_index, 1)
self.assertEqual(version.version_label, "v2")
self.assertIsNone(version.archive_serial_number)
self.assertEqual(version.original_filename, version_file.name)
self.assertTrue(bool(version.content))
# Initial consume already created version_number=1.
# Version upload created version_number=2.
versions = DocumentVersion.objects.filter(document=root_doc).order_by(
"version_number",
)
self.assertEqual(versions.count(), 2)
uploaded = versions.get(version_number=2)
assert uploaded.original_filename is not None
self.assertEqual(uploaded.version_label, "v2")
self.assertEqual(uploaded.original_filename, version_file.name)
self.assertTrue(bool(uploaded.content))
@override_settings(AUDIT_LOG_ENABLED=True)
@mock.patch("documents.consumer.load_classifier")
@@ -852,14 +855,16 @@ class TestConsumer(
finally:
consumer.cleanup()
version = (
Document.objects.filter(root_document=root_doc).order_by("-id").first()
# Initial consume already created version_number=1.
# Version upload created version_number=2.
versions = DocumentVersion.objects.filter(document=root_doc).order_by(
"version_number",
)
self.assertIsNotNone(version)
assert version is not None
self.assertEqual(version.version_index, 1)
self.assertEqual(version.original_filename, "valid_pdf_version-upload")
self.assertTrue(bool(version.content))
uploaded = versions.get(version_number=2)
self.assertIsNotNone(uploaded)
assert uploaded is not None
self.assertEqual(uploaded.original_filename, "valid_pdf_version-upload")
self.assertTrue(bool(uploaded.content))
@override_settings(AUDIT_LOG_ENABLED=True)
@mock.patch("documents.consumer.load_classifier")
@@ -873,7 +878,7 @@ class TestConsumer(
self.assertIsNotNone(root_doc)
assert root_doc is not None
def consume_version(version_file: Path) -> Document:
def consume_version(version_file: Path) -> DocumentVersion:
status = DummyProgressManager(version_file.name, None)
overrides = DocumentMetadataOverrides()
doc = ConsumableDocument(
@@ -905,18 +910,22 @@ class TestConsumer(
consumer.cleanup()
version = (
Document.objects.filter(root_document=root_doc).order_by("-id").first()
DocumentVersion.objects.filter(document=root_doc)
.order_by("-version_number")
.first()
)
assert version is not None
return version
v1 = consume_version(self.get_test_file2())
self.assertEqual(v1.version_index, 1)
v1.delete()
# First upload: version_number=2 (version 1 was created at initial consume)
v1_dv = consume_version(self.get_test_file2())
self.assertEqual(v1_dv.version_number, 2)
v1_dv.delete()
# The next version should have version_index 2, even though version_index 1 was deleted
v2 = consume_version(self.get_test_file())
self.assertEqual(v2.version_index, 2)
# After deleting version_number=2, MAX is 1 (the initial consume version).
# The next upload gets MAX+1 = 2.
v2_dv = consume_version(self.get_test_file())
self.assertEqual(v2_dv.version_number, 2)
@mock.patch("documents.consumer.load_classifier")
def testClassifyDocument(self, m) -> None: