Compare commits

...

18 Commits

Author SHA1 Message Date
Trenton H
d37f889e41 chore: remove profiling pytest marker
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 10:52:37 -07:00
Trenton H
2f1e8d2224 chore: remove profiling test files
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 10:52:15 -07:00
Trenton H
2cfdd4f530 fix: update test_b1 query threshold — prefetch versions adds 1 query vs old correlated subquery 2026-04-14 10:33:47 -07:00
Trenton H
c368331a61 fix: final sweep — update stale tests to use DocumentVersion model
- test_document_model: replace root_document FK tests with DocumentVersion
  cascade test, and update suggestion_content/content_length tests to
  reflect that Document.content is now always current (no version proxy)
- test_matchables: replace obsolete root_document version-fallback tests
  with plain content-matching tests (matching now uses Document.content)
- test_workflows: replace "ignores version documents" tests (concept removed)
  with tests verifying workflows run correctly on versioned documents
- test_version_profile: rewrite corpora to use DocumentVersion.objects
  instead of old Document.root_document/version_index fields; fix module-
  scoped fixture teardown to use hard_delete() to prevent test isolation
  leaks into deleted_objects; keep pre-refactor baseline numbers in summary

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 10:25:47 -07:00
Trenton H
51cb7eff12 feat: update DocumentVersionInfo and Document interfaces for DocumentVersion API
- Add version_number field to DocumentVersionInfo interface
- Remove root_document field from Document interface (field removed from backend model)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 09:08:02 -07:00
Trenton H
18e4505e05 feat: remove root_document references from bulk_edit/tasks, add v1 DocumentVersion to factory
- bulk_edit.py: remove get_root_document/get_latest_version_for_root imports,
  remove _resolve_root_and_source_doc helper, remove select_related("root_document")
  calls, simplify delete() to use CASCADE behavior, update PDF action functions
  to use doc directly as root_doc/source_doc
- tasks.py: remove root_document__isnull=True filters from scheduled workflow
  queries (field removed from Document model)
- factories.py: add with_version post_generation hook to DocumentFactory so
  tests that bypass the consumer still get a v1 DocumentVersion
- test_bulk_edit.py: rewrite version-related tests to use DocumentVersion model
  instead of deprecated root_document Document pattern
- test_api_documents.py: add DocumentVersion.objects.create() alongside
  Document.objects.create() in tests that call download/preview/thumb/metadata
  endpoints, rewrite test_document_history_logs_version_deletion to use
  DocumentVersion objects

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 08:59:47 -07:00
Trenton H
e790d3e7f8 fix: remove get_effective_content call from matching.py and update task signal tests
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 08:23:07 -07:00
Trenton H
19d930a81a feat: update conditionals.py to use resolve_requested_version and DocumentVersion
- Replace resolve_effective_document_by_pk stub with direct get_object_or_404
  + resolve_requested_version calls in all five conditional functions
- Switch from .modified (shim) to .added (DocumentVersion native field)
- Switch thumbnail cache key to use version.id instead of document id
- Re-add get_root_document/get_latest_version_for_root stubs to versioning.py
  (bulk_edit.py still needs them; Task 10 will remove them)
- Update TestVersionAwareFilters tests to reflect simplified filter behavior
  (no more FieldError fallback; filters query content directly)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 08:05:34 -07:00
Trenton H
2116ea3329 feat: remove effective_content machinery — Document.content is always current
- EffectiveContentFilter and TitleContentFilter simplified to query content directly
- Remove FieldError fallback try/except blocks; effective_content annotation gone
- add_to_index handler no longer calls get_effective_content()
- _build_tantivy_doc and add_or_update drop effective_content parameter
- Remove compatibility stubs from versioning.py (EffectiveDocumentResolution,
  resolve_effective_document_by_pk, get_root_document, get_latest_version_for_root)
- Remove get_effective_content() shim from Document model
- Remove DocumentVersion.modified compatibility property

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 07:13:05 -07:00
Trenton H
f18b56ed8a test: rewrite test_api_document_versions for DocumentVersion model
- Replace Document-as-version pattern with DocumentVersion objects
- Add _create_doc and _create_version helpers; remove _create_pdf
- Remove deleted root/ endpoint tests
- Fix views.py: use isinstance(content_doc, DocumentVersion) instead
  of id comparison (cross-table id collision), add Document.content
  sync when latest version content is edited
- Add compatibility stubs to versioning.py (get_root_document,
  get_latest_version_for_root, resolve_effective_document_by_pk,
  EffectiveDocumentResolution) so bulk_edit.py and conditionals.py
  imports resolve; Tasks 9 and 10 will refactor the callers
- Add DocumentVersion.modified property shim for conditionals.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 21:07:28 -07:00
Trenton H
45f32afcde feat: update DocumentViewSet version endpoints for DocumentVersion
Replace old version-as-Document pattern with DocumentVersion model
throughout the viewset: get_queryset prefetches DocumentVersion,
_resolve_file_doc returns DocumentVersion, delete_version /
update_version_label operate on DocumentVersion objects, and
_get_version_doc_for_root is replaced by _get_version_for_doc.
Remove root action and get_root_document / Subquery / OuterRef /
Coalesce usages no longer needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 15:49:05 -07:00
Trenton H
702f7ea57a feat: update DocumentVersionInfoSerializer and get_versions for DocumentVersion
- Replace old Document-row-based version info with DocumentVersion-based fields
- Add version_number field; derive is_root from version_number == 1
- Rewrite get_versions() to query DocumentVersion model instead of Document
- Remove root_document field from DocumentSerializer and Meta.fields
- Remove root_document__isnull filter from _get_viewable_duplicates
- Simplify to_representation: remove effective_content block, fix truncation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 15:36:47 -07:00
Trenton H
a351dfa25c feat: consumer creates DocumentVersion on consume and version upload
- Rewrite _create_version_from_root to create DocumentVersion instead of
  a root_document Document row; uses MAX(version_number)+1 with
  SELECT FOR UPDATE on non-SQLite to prevent races
- Split FileLock block: version uploads write files to DocumentVersion,
  then sync Document cache fields (filename, checksum, content, etc.)
- Non-version path creates DocumentVersion(version_number=1) after
  document.save() so filenames are populated
- Remove version_label from apply_overrides (now applied at
  DocumentVersion creation time)
- Add get_effective_content() shim to Document (Task 9 will remove callers)
- Remove stale root_document_id references from signals/handlers.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 15:23:23 -07:00
Trenton H
54f44c6c05 feat: update generate_filename/generate_unique_filename to accept DocumentVersion
Replace root_document_id/version_index suffix logic with an optional
`version: DocumentVersion | None` parameter. When passed, appends
`_vN` using version.version_number. Update four version-index tests
with a single new test that exercises the new signature.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 14:59:14 -07:00
Trenton H
8283642bfb refactor: extract DocumentBase abstract model shared by Document and DocumentVersion
Moves shared file-level fields (checksum, archive_checksum, content,
mime_type, added) and shared properties (has_archive_version, archive_path,
archive_file, file_type, thumbnail_file, get_public_filename) into a new
abstract base class DocumentBase. DocumentVersion overrides
_public_display_name() to use the parent document title. Produces no
migration changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 14:43:46 -07:00
Trenton H
1139e7f59b feat: rewrite versioning.py to operate on DocumentVersion
Replace the old root_document FK navigation with DocumentVersion
queryset lookups. Add DocumentVersionFactory to factories.py.
Rewrite test_version_conditionals.py to test the new API using
pytest style with factories.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 14:28:18 -07:00
Trenton H
fa96b4629b feat: replace versioning migrations with DocumentVersion table, renumber chain
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 14:22:41 -07:00
Trenton H
130fcc7e42 feat: add DocumentVersion model, remove version fields from Document
Introduces DocumentVersion as a dedicated model for per-version file
data, replacing the self-referential root_document/version_index/
version_label fields on Document. Removes get_effective_content() and
simplifies suggestion_content to use Document.content directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 14:01:58 -07:00
35 changed files with 1291 additions and 2064 deletions

View File

@@ -162,7 +162,6 @@ export interface Document extends ObjectWithPermissions {
duplicate_documents?: Document[]
// Versioning
root_document?: number
versions?: DocumentVersionInfo[]
// Frontend only
@@ -171,6 +170,7 @@ export interface Document extends ObjectWithPermissions {
export interface DocumentVersionInfo {
id: number
version_number: number
added?: Date
version_label?: string
checksum?: string

View File

@@ -29,8 +29,6 @@ from documents.plugins.helpers import DocumentsStatusManager
from documents.tasks import bulk_update_documents
from documents.tasks import consume_file
from documents.tasks import update_document_content_maybe_archive_file
from documents.versioning import get_latest_version_for_root
from documents.versioning import get_root_document
if TYPE_CHECKING:
from django.contrib.auth.models import User
@@ -81,23 +79,6 @@ def restore_archive_serial_numbers(backup: dict[int, int | None]) -> None:
logger.info(f"Restored archive serial numbers for documents {list(backup.keys())}")
def _resolve_root_and_source_doc(
doc: Document,
*,
source_mode: SourceMode = SourceModeChoices.LATEST_VERSION,
) -> tuple[Document, Document]:
root_doc = get_root_document(doc)
if source_mode == SourceModeChoices.EXPLICIT_SELECTION:
return root_doc, doc
# Version IDs are explicit by default, only a selected root resolves to latest
if doc.root_document_id is not None:
return root_doc, doc
return root_doc, get_latest_version_for_root(root_doc)
def set_correspondent(
doc_ids: list[int],
correspondent: Correspondent,
@@ -334,20 +315,10 @@ def modify_custom_fields(
@shared_task
def delete(doc_ids: list[int]) -> Literal["OK"]:
try:
root_ids = (
Document.objects.filter(id__in=doc_ids, root_document__isnull=True)
.values_list("id", flat=True)
.distinct()
)
version_ids = (
Document.objects.filter(root_document_id__in=root_ids)
.exclude(id__in=doc_ids)
.values_list("id", flat=True)
.distinct()
)
delete_ids = list({*doc_ids, *version_ids})
delete_ids = list(doc_ids)
Document.objects.filter(id__in=delete_ids).delete()
# DocumentVersion rows are removed by CASCADE automatically.
from documents.search import get_backend
@@ -413,7 +384,7 @@ def rotate(
)
docs_by_id = {
doc.id: doc
for doc in Document.objects.select_related("root_document").filter(
for doc in Document.objects.filter(
id__in=doc_ids,
)
}
@@ -422,11 +393,7 @@ def rotate(
doc = docs_by_id.get(doc_id)
if doc is None:
continue
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
docs_by_root_id.setdefault(root_doc.id, (root_doc, source_doc))
docs_by_root_id.setdefault(doc.id, (doc, doc))
import pikepdf
@@ -482,7 +449,7 @@ def merge(
logger.info(
f"Attempting to merge {len(doc_ids)} documents into a single document.",
)
qs = Document.objects.select_related("root_document").filter(id__in=doc_ids)
qs = Document.objects.filter(id__in=doc_ids)
docs_by_id = {doc.id: doc for doc in qs}
affected_docs: list[int] = []
import pikepdf
@@ -495,10 +462,7 @@ def merge(
doc = docs_by_id.get(doc_id)
if doc is None:
continue
_, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
source_doc = doc
try:
doc_path = (
source_doc.archive_path
@@ -593,11 +557,8 @@ def split(
logger.info(
f"Attempting to split document {doc_ids[0]} into {len(pages)} documents",
)
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
_, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
doc = Document.objects.get(id=doc_ids[0])
source_doc = doc
import pikepdf
consume_tasks = []
@@ -673,11 +634,9 @@ def delete_pages(
logger.info(
f"Attempting to delete pages {pages} from {len(doc_ids)} documents",
)
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
doc = Document.objects.get(id=doc_ids[0])
root_doc = doc
source_doc = doc
pages = sorted(pages) # sort pages to avoid index issues
import pikepdf
@@ -736,11 +695,9 @@ def edit_pdf(
logger.info(
f"Editing PDF of document {doc_ids[0]} with {len(operations)} operations",
)
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
doc = Document.objects.get(id=doc_ids[0])
root_doc = doc
source_doc = doc
import pikepdf
pdf_docs: list[pikepdf.Pdf] = []
@@ -860,11 +817,9 @@ def remove_password(
import pikepdf
for doc_id in doc_ids:
doc = Document.objects.select_related("root_document").get(id=doc_id)
root_doc, source_doc = _resolve_root_and_source_doc(
doc,
source_mode=source_mode,
)
doc = Document.objects.get(id=doc_id)
root_doc = doc
source_doc = doc
try:
logger.info(
f"Attempting password removal from document {doc_ids[0]}",

View File

@@ -4,6 +4,7 @@ from typing import Any
from django.conf import settings
from django.core.cache import cache
from django.shortcuts import get_object_or_404
from documents.caching import CACHE_5_MINUTES
from documents.caching import CACHE_50_MINUTES
@@ -13,7 +14,7 @@ from documents.caching import CLASSIFIER_VERSION_KEY
from documents.caching import get_thumbnail_modified_key
from documents.classifier import DocumentClassifier
from documents.models import Document
from documents.versioning import resolve_effective_document_by_pk
from documents.versioning import resolve_requested_version
def suggestions_etag(request, pk: int) -> str | None:
@@ -73,48 +74,53 @@ def metadata_etag(request, pk: int) -> str | None:
Metadata is extracted from the original file, so use its checksum as the
ETag
"""
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
doc = get_object_or_404(Document, pk=pk)
resolution = resolve_requested_version(doc, request)
version = resolution.version
if version is None:
return None
return doc.checksum
return version.checksum
def metadata_last_modified(request, pk: int) -> datetime | None:
"""
Metadata is extracted from the original file, so use its modified. Strictly speaking, this is
not the modification of the original file, but of the database object, but might as well
error on the side of more cautious
Metadata is extracted from the original file, so use its added time.
"""
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
doc = get_object_or_404(Document, pk=pk)
resolution = resolve_requested_version(doc, request)
version = resolution.version
if version is None:
return None
return doc.modified
return version.added
def preview_etag(request, pk: int) -> str | None:
"""
ETag for the document preview, using the original or archive checksum, depending on the request
"""
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
doc = get_object_or_404(Document, pk=pk)
resolution = resolve_requested_version(doc, request)
version = resolution.version
if version is None:
return None
use_original = (
hasattr(request, "query_params")
and "original" in request.query_params
and request.query_params["original"] == "true"
)
return doc.checksum if use_original else doc.archive_checksum
return version.checksum if use_original else version.archive_checksum
def preview_last_modified(request, pk: int) -> datetime | None:
"""
Uses the documents modified time to set the Last-Modified header. Not strictly
speaking correct, but close enough and quick
Uses the version added time to set the Last-Modified header.
"""
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
doc = get_object_or_404(Document, pk=pk)
resolution = resolve_requested_version(doc, request)
version = resolution.version
if version is None:
return None
return doc.modified
return version.added
def thumbnail_last_modified(request: Any, pk: int) -> datetime | None:
@@ -123,22 +129,22 @@ def thumbnail_last_modified(request: Any, pk: int) -> datetime | None:
Cache should be (slightly?) faster than filesystem
"""
try:
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
doc = get_object_or_404(Document, pk=pk)
resolution = resolve_requested_version(doc, request)
version = resolution.version
if version is None:
return None
if not doc.thumbnail_path.exists():
if not version.thumbnail_path.exists():
return None
# Use the effective document id for cache key
doc_key = get_thumbnail_modified_key(doc.id)
doc_key = get_thumbnail_modified_key(version.id)
cache_hit = cache.get(doc_key)
if cache_hit is not None:
cache.touch(doc_key, CACHE_50_MINUTES)
return cache_hit
# No cache, get the timestamp and cache the datetime
last_modified = datetime.fromtimestamp(
doc.thumbnail_path.stat().st_mtime,
version.thumbnail_path.stat().st_mtime,
tz=UTC,
)
cache.set(doc_key, last_modified, CACHE_50_MINUTES)

View File

@@ -11,6 +11,7 @@ from typing import Final
import magic
from django.conf import settings
from django.contrib.auth.models import User
from django.db import connection
from django.db import transaction
from django.db.models import Max
from django.db.models import Q
@@ -30,6 +31,7 @@ from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import DocumentVersion
from documents.models import StoragePath
from documents.models import Tag
from documents.models import WorkflowTrigger
@@ -250,39 +252,41 @@ class ConsumerPlugin(
text: str | None,
page_count: int | None,
mime_type: str,
) -> Document:
self.log.debug("Saving record for updated version to database")
root_doc_frozen = Document.objects.select_for_update().get(pk=root_doc.pk)
next_version_index = (
Document.global_objects.filter(
root_document_id=root_doc_frozen.pk,
).aggregate(
max_index=Max("version_index"),
)["max_index"]
) -> DocumentVersion:
self.log.debug("Saving record for new version to database")
# SQLite uses BEGIN EXCLUSIVE on write inside transaction.atomic(), which gives
# serializable isolation — SELECT FOR UPDATE is both unnecessary and unsupported.
# PostgreSQL and MariaDB need the explicit row lock to prevent concurrent version
# number races; the lock is held for the duration of the outer transaction.
if connection.vendor != "sqlite":
DocumentVersion.objects.select_for_update().filter(
document=root_doc,
).exists()
next_number = (
DocumentVersion.objects.filter(document=root_doc).aggregate(
max_num=Max("version_number"),
)["max_num"]
or 0
)
) + 1
file_for_checksum = (
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy
)
version_doc = Document(
root_document=root_doc_frozen,
version_index=next_version_index + 1,
new_version = DocumentVersion(
document=root_doc,
version_number=next_number,
checksum=compute_checksum(file_for_checksum),
content=text or "",
page_count=page_count,
mime_type=mime_type,
original_filename=self.filename,
owner_id=root_doc_frozen.owner_id,
created=root_doc_frozen.created,
title=root_doc_frozen.title,
added=timezone.now(),
modified=timezone.now(),
)
if self.metadata.version_label is not None:
version_doc.version_label = self.metadata.version_label
return version_doc
new_version.version_label = self.metadata.version_label
return new_version
def run_pre_consume_script(self) -> None:
"""
@@ -586,21 +590,17 @@ class ConsumerPlugin(
with transaction.atomic():
# store the document.
if self.input_doc.root_document_id:
# If this is a new version of an existing document, we need
# to make sure we're not creating a new document, but updating
# the existing one.
root_doc = Document.objects.get(
pk=self.input_doc.root_document_id,
)
original_document = self._create_version_from_root(
new_version = self._create_version_from_root(
root_doc,
text=text,
page_count=page_count,
mime_type=mime_type,
)
actor = None
# Save the new version, potentially creating an audit log entry for the version addition if enabled.
actor = None
if (
settings.AUDIT_LOG_ENABLED
and self.metadata.actor_id is not None
@@ -608,37 +608,33 @@ class ConsumerPlugin(
actor = User.objects.filter(
pk=self.metadata.actor_id,
).first()
if actor is not None:
from auditlog.context import ( # type: ignore[import-untyped]
set_actor,
)
with set_actor(actor):
original_document.save()
else:
original_document.save()
if actor is not None:
from auditlog.context import (
set_actor, # type: ignore[import-untyped]
)
with set_actor(actor):
new_version.save()
else:
original_document.save()
new_version.save()
# Create a log entry for the version addition, if enabled
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import ( # type: ignore[import-untyped]
LogEntry,
from auditlog.models import (
LogEntry, # type: ignore[import-untyped]
)
LogEntry.objects.log_create(
instance=root_doc,
changes={
"Version Added": ["None", original_document.id],
},
changes={"Version Added": ["None", new_version.pk]},
action=LogEntry.Action.UPDATE,
actor=actor,
additional_data={
"reason": "Version added",
"version_id": original_document.id,
"version_id": new_version.pk,
},
)
document = original_document
document = root_doc
else:
document = self._store(
text=text,
@@ -666,71 +662,179 @@ class ConsumerPlugin(
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(document)
if (
len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_filename = generate_filename(
document,
use_format=False,
)
document.filename = generated_filename
create_source_path_directory(document.source_path)
self._write(
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy,
document.source_path,
)
self._write(
thumbnail,
document.thumbnail_path,
)
if archive_path and Path(archive_path).is_file():
generated_archive_filename = generate_unique_filename(
document,
archive_filename=True,
if self.input_doc.root_document_id:
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(
root_doc,
new_version,
)
if (
len(str(generated_archive_filename))
len(str(generated_filename))
> DocumentVersion.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_filename = generate_filename(
root_doc,
new_version,
use_format=False,
)
new_version.filename = generated_filename
create_source_path_directory(new_version.source_path)
self._write(
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy,
new_version.source_path,
)
self._write(thumbnail, new_version.thumbnail_path)
if archive_path and Path(archive_path).is_file():
generated_archive_filename = (
generate_unique_filename(
root_doc,
new_version,
archive_filename=True,
)
)
if (
len(str(generated_archive_filename))
> DocumentVersion.MAX_STORED_FILENAME_LENGTH
):
generated_archive_filename = generate_filename(
root_doc,
new_version,
archive_filename=True,
use_format=False,
)
new_version.archive_filename = (
generated_archive_filename
)
create_source_path_directory(
new_version.archive_path,
)
self._write(archive_path, new_version.archive_path)
new_version.archive_checksum = compute_checksum(
new_version.archive_path,
)
new_version.save(
update_fields=[
"filename",
"archive_filename",
"archive_checksum",
],
)
# Sync all Document cache fields from the new version so search/matching
# and file-serving remain correct without any subquery.
root_doc.content = new_version.content
root_doc.checksum = new_version.checksum
root_doc.archive_checksum = new_version.archive_checksum
root_doc.filename = new_version.filename
root_doc.archive_filename = new_version.archive_filename
root_doc.mime_type = new_version.mime_type
root_doc.page_count = new_version.page_count
root_doc.original_filename = new_version.original_filename
root_doc.modified = timezone.now()
root_doc.save(
update_fields=[
"content",
"checksum",
"archive_checksum",
"filename",
"archive_filename",
"mime_type",
"page_count",
"original_filename",
"modified",
],
)
document_updated.send(
sender=self.__class__,
document=root_doc,
)
else:
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(document)
if (
len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming",
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_archive_filename = generate_filename(
generated_filename = generate_filename(
document,
archive_filename=True,
use_format=False,
)
document.archive_filename = generated_archive_filename
create_source_path_directory(document.archive_path)
document.filename = generated_filename
create_source_path_directory(document.source_path)
self._write(
archive_path,
document.archive_path,
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy,
document.source_path,
)
document.archive_checksum = compute_checksum(
document.archive_path,
self._write(
thumbnail,
document.thumbnail_path,
)
# Don't save with the lock active. Saving will cause the file
# renaming logic to acquire the lock as well.
# This triggers things like file renaming
document.save()
if archive_path and Path(archive_path).is_file():
generated_archive_filename = (
generate_unique_filename(
document,
archive_filename=True,
)
)
if (
len(str(generated_archive_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming",
)
generated_archive_filename = generate_filename(
document,
archive_filename=True,
use_format=False,
)
document.archive_filename = (
generated_archive_filename
)
create_source_path_directory(document.archive_path)
self._write(
archive_path,
document.archive_path,
)
if document.root_document_id:
document_updated.send(
sender=self.__class__,
document=document.root_document,
document.archive_checksum = compute_checksum(
document.archive_path,
)
# Don't save with the lock active. Saving will cause the file
# renaming logic to acquire the lock as well.
# This triggers things like file renaming
document.save()
DocumentVersion.objects.create(
document=document,
version_number=1,
checksum=document.checksum,
archive_checksum=document.archive_checksum,
content=document.content,
page_count=document.page_count,
mime_type=document.mime_type,
original_filename=document.original_filename,
filename=document.filename,
archive_filename=document.archive_filename,
added=document.added,
version_label=self.metadata.version_label,
)
# Delete the file only if it was successfully consumed
@@ -896,9 +1000,6 @@ class ConsumerPlugin(
if self.metadata.asn is not None:
document.archive_serial_number = self.metadata.asn
if self.metadata.version_label is not None:
document.version_label = self.metadata.version_label
if self.metadata.owner_id:
document.owner = User.objects.get(
pk=self.metadata.owner_id,

View File

@@ -1,12 +1,18 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import TYPE_CHECKING
from django.conf import settings
from documents.models import Document
from documents.templating.filepath import validate_filepath_template_and_render
from documents.templating.utils import convert_format_str_to_template_format
if TYPE_CHECKING:
from documents.models import Document
from documents.models import DocumentVersion
def create_source_path_directory(source_path: Path) -> None:
source_path.parent.mkdir(parents=True, exist_ok=True)
@@ -41,7 +47,12 @@ def delete_empty_directories(directory: Path, root: Path) -> None:
directory = directory.parent
def generate_unique_filename(doc, *, archive_filename=False) -> Path:
def generate_unique_filename(
doc: Document,
version: DocumentVersion | None = None,
*,
archive_filename: bool = False,
) -> Path:
"""
Generates a unique filename for doc in settings.ORIGINALS_DIR.
@@ -67,7 +78,11 @@ def generate_unique_filename(doc, *, archive_filename=False) -> Path:
if archive_filename and doc.filename:
# Generate the full path using the same logic as generate_filename
base_generated = generate_filename(doc, archive_filename=archive_filename)
base_generated = generate_filename(
doc,
version,
archive_filename=archive_filename,
)
# Try to create a simple PDF version based on the original filename
# but preserve any directory structure from the template
@@ -86,6 +101,7 @@ def generate_unique_filename(doc, *, archive_filename=False) -> Path:
while True:
new_filename = generate_filename(
doc,
version,
counter=counter,
archive_filename=archive_filename,
)
@@ -124,24 +140,19 @@ def format_filename(document: Document, template_str: str) -> str | None:
def generate_filename(
doc: Document,
version: DocumentVersion | None = None,
*,
counter=0,
archive_filename=False,
use_format=True,
counter: int = 0,
archive_filename: bool = False,
use_format: bool = True,
) -> Path:
# version docs use the root document for formatting, just with a suffix
context_doc = doc if doc.root_document_id is None else doc.root_document
version_suffix = (
f"_v{doc.version_index}"
if doc.root_document_id is not None and doc.version_index is not None
else ""
)
version_suffix = f"_v{version.version_number}" if version is not None else ""
base_path: Path | None = None
# Determine the source of the format string
if use_format:
if context_doc.storage_path is not None:
filename_format = context_doc.storage_path.path
if doc.storage_path is not None:
filename_format = doc.storage_path.path
elif settings.FILENAME_FORMAT is not None:
# Maybe convert old to new style
filename_format = convert_format_str_to_template_format(
@@ -154,7 +165,7 @@ def generate_filename(
# If we have one, render it
if filename_format is not None:
rendered_path: str | None = format_filename(context_doc, filename_format)
rendered_path: str | None = format_filename(doc, filename_format)
if rendered_path:
base_path = Path(rendered_path)
@@ -177,9 +188,7 @@ def generate_filename(
full_path = Path(final_filename)
else:
# No template, use document ID
final_filename = (
f"{context_doc.pk:07}{version_suffix}{counter_str}{filetype_str}"
)
final_filename = f"{doc.pk:07}{version_suffix}{counter_str}{filetype_str}"
full_path = Path(final_filename)
return full_path

View File

@@ -10,7 +10,6 @@ from typing import TYPE_CHECKING
from typing import Any
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import FieldError
from django.db.models import Case
from django.db.models import CharField
from django.db.models import Count
@@ -172,16 +171,10 @@ class TitleContentFilter(Filter):
logger.warning(
"Deprecated document filter parameter 'title_content' used; use `text` instead.",
)
try:
return qs.filter(
Q(title__icontains=value) | Q(effective_content__icontains=value),
)
except FieldError:
return qs.filter(
Q(title__icontains=value) | Q(content__icontains=value),
)
else:
return qs
return qs.filter(
Q(title__icontains=value) | Q(content__icontains=value),
)
return qs
@extend_schema_field(serializers.CharField)
@@ -190,14 +183,7 @@ class EffectiveContentFilter(Filter):
value = value.strip() if isinstance(value, str) else value
if not value:
return qs
try:
return qs.filter(
**{f"effective_content__{self.lookup_expr}": value},
)
except FieldError:
return qs.filter(
**{f"content__{self.lookup_expr}": value},
)
return qs.filter(**{f"content__{self.lookup_expr}": value})
@extend_schema_field(serializers.BooleanField)

View File

@@ -169,7 +169,7 @@ def match_storage_paths(document: Document, classifier: DocumentClassifier, user
def matches(matching_model: MatchingModel, document: Document):
search_flags = 0
document_content = document.get_effective_content() or ""
document_content = document.content or ""
# Check that match is not empty
if not matching_model.match.strip():

View File

@@ -0,0 +1,153 @@
from __future__ import annotations
import django.core.validators
import django.db.models
import django.db.models.deletion
import django.utils.timezone
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "0011_alter_workflowaction_type"),
]
operations = [
migrations.CreateModel(
name="DocumentVersion",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"version_number",
models.PositiveSmallIntegerField(
help_text="Sequential version number within this document, starting at 1.",
verbose_name="version number",
),
),
(
"version_label",
models.CharField(
blank=True,
help_text="Optional short label for this version.",
max_length=64,
null=True,
verbose_name="version label",
),
),
(
"added",
models.DateTimeField(
db_index=True,
default=django.utils.timezone.now,
editable=False,
verbose_name="added",
),
),
(
"checksum",
models.CharField(
editable=False,
help_text="SHA-256 checksum of the original file for this version.",
max_length=64,
verbose_name="checksum",
),
),
(
"archive_checksum",
models.CharField(
blank=True,
editable=False,
max_length=64,
null=True,
verbose_name="archive checksum",
),
),
(
"content",
models.TextField(
blank=True,
help_text="OCR text content of this version.",
verbose_name="content",
),
),
(
"page_count",
models.PositiveIntegerField(
blank=True,
null=True,
validators=[django.core.validators.MinValueValidator(1)],
verbose_name="page count",
),
),
(
"mime_type",
models.CharField(
editable=False,
max_length=256,
verbose_name="mime type",
),
),
(
"original_filename",
models.CharField(
blank=True,
editable=False,
max_length=1024,
null=True,
verbose_name="original filename",
),
),
(
"filename",
models.FilePathField(
default=None,
editable=False,
help_text="Stored filename for this version's original file.",
max_length=1024,
null=True,
verbose_name="filename",
),
),
(
"archive_filename",
models.FilePathField(
default=None,
editable=False,
max_length=1024,
null=True,
verbose_name="archive filename",
),
),
(
"document",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="versions",
to="documents.document",
verbose_name="document",
),
),
],
options={
"verbose_name": "document version",
"verbose_name_plural": "document versions",
"ordering": ["-version_number"],
},
),
migrations.AddConstraint(
model_name="documentversion",
constraint=models.UniqueConstraint(
fields=("document", "version_number"),
name="documents_documentversion_doc_number_uniq",
),
),
]

View File

@@ -1,37 +0,0 @@
# Generated by Django 5.1.6 on 2025-02-26 17:08
import django.db.models.deletion
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "0011_alter_workflowaction_type"),
]
operations = [
migrations.AddField(
model_name="document",
name="root_document",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="versions",
to="documents.document",
verbose_name="root document for this version",
),
),
migrations.AddField(
model_name="document",
name="version_label",
field=models.CharField(
blank=True,
help_text="Optional short label for a document version.",
max_length=64,
null=True,
verbose_name="version label",
),
),
]

View File

@@ -6,7 +6,7 @@ from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "0012_document_root_document"),
("documents", "0012_add_document_version"),
]
operations = [

View File

@@ -1,37 +0,0 @@
# Generated by Django 5.2.11 on 2026-03-02 17:48
from django.conf import settings
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "0014_savedview_visibility_to_ui_settings"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name="document",
name="version_index",
field=models.PositiveIntegerField(
blank=True,
db_index=True,
help_text="Index of this version within the root document.",
null=True,
verbose_name="version index",
),
),
migrations.AddConstraint(
model_name="document",
constraint=models.UniqueConstraint(
condition=models.Q(
("root_document__isnull", False),
("version_index__isnull", False),
),
fields=("root_document", "version_index"),
name="documents_document_root_version_index_uniq",
),
),
]

View File

@@ -100,7 +100,7 @@ def recompute_checksums(apps, schema_editor):
class Migration(migrations.Migration):
dependencies = [
("documents", "0015_document_version_index_and_more"),
("documents", "0014_savedview_visibility_to_ui_settings"),
]
operations = [

View File

@@ -28,7 +28,7 @@ def migrate_fulltext_query_field_prefixes(apps, schema_editor):
class Migration(migrations.Migration):
dependencies = [
("documents", "0016_sha256_checksums"),
("documents", "0015_sha256_checksums"),
]
operations = [

View File

@@ -22,7 +22,7 @@ def migrate_saved_view_rules_forward(apps, schema_editor):
class Migration(migrations.Migration):
dependencies = [
("documents", "0017_migrate_fulltext_query_field_prefixes"),
("documents", "0016_migrate_fulltext_query_field_prefixes"),
]
operations = [

View File

@@ -155,7 +155,189 @@ class StoragePath(MatchingModel):
verbose_name_plural = _("storage paths")
class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-missing]
class DocumentBase(models.Model):
"""Abstract base shared by Document and DocumentVersion.
Holds the file-level fields (checksums, content, mime_type, added) and
the file-serving properties that both concrete models share identically.
Subclasses must implement ``source_path`` and ``thumbnail_path``.
"""
checksum = models.CharField(
_("checksum"),
max_length=64,
editable=False,
)
archive_checksum = models.CharField(
_("archive checksum"),
max_length=64,
blank=True,
null=True,
editable=False,
)
content = models.TextField(
_("content"),
blank=True,
)
mime_type = models.CharField(_("mime type"), max_length=256, editable=False)
added = models.DateTimeField(
_("added"),
default=timezone.now,
editable=False,
db_index=True,
)
class Meta:
abstract = True
def _public_display_name(self) -> str:
"""Return the human-readable title used in get_public_filename."""
return str(self)
@property
def has_archive_version(self) -> bool:
return self.archive_filename is not None
@property
def archive_path(self) -> Path | None:
if self.archive_filename is not None:
return (settings.ARCHIVE_DIR / Path(str(self.archive_filename))).resolve()
return None
@property
def archive_file(self):
path = self.archive_path
if path is None:
raise ValueError(f"{self!r} has no archive file")
return path.open("rb")
@property
def file_type(self) -> str:
return get_default_file_extension(self.mime_type)
@property
def thumbnail_file(self):
return self.thumbnail_path.open("rb")
def get_public_filename(self, *, archive=False, counter=0, suffix=None) -> str:
"""Return a sanitized filename for download."""
result = self._public_display_name()
if counter:
result += f"_{counter:02}"
if suffix:
result += suffix
if archive:
result += ".pdf"
else:
result += self.file_type
return pathvalidate.sanitize_filename(result, replacement_text="-")
class DocumentVersion(DocumentBase):
"""
Stores per-version file data for a document.
Version 1 is created on initial consume; subsequent uploads add higher numbers.
Document.filename / content / checksum always reflect the latest version.
DocumentVersion.pk is used as the version ID in API calls.
version_number is a per-document sequential integer used for filename suffixes (_v2, etc.).
"""
MAX_STORED_FILENAME_LENGTH: Final[int] = 1024
document = models.ForeignKey(
"Document",
on_delete=models.CASCADE,
related_name="versions",
verbose_name=_("document"),
)
version_number = models.PositiveSmallIntegerField(
_("version number"),
help_text=_("Sequential version number within this document, starting at 1."),
)
version_label = models.CharField(
_("version label"),
max_length=64,
blank=True,
null=True,
help_text=_("Optional short label for this version."),
)
page_count = models.PositiveIntegerField(
_("page count"),
blank=True,
null=True,
validators=[MinValueValidator(1)],
)
original_filename = models.CharField(
_("original filename"),
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
null=True,
blank=True,
)
filename = models.FilePathField(
_("filename"),
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
default=None,
null=True,
help_text=_("Stored filename for this version's original file."),
)
archive_filename = models.FilePathField(
_("archive filename"),
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
default=None,
null=True,
)
class Meta:
ordering = ["-version_number"]
verbose_name = _("document version")
verbose_name_plural = _("document versions")
constraints = [
models.UniqueConstraint(
fields=["document", "version_number"],
name="documents_documentversion_doc_number_uniq",
),
]
def __str__(self) -> str:
return f"DocumentVersion {self.version_number} of document {self.document_id}"
def _public_display_name(self) -> str:
return str(self.document)
@property
def source_path(self) -> Path:
if self.filename is None:
raise ValueError(f"DocumentVersion {self.pk} has no filename set")
return (settings.ORIGINALS_DIR / Path(str(self.filename))).resolve()
@property
def thumbnail_path(self) -> Path:
# Prefix "v" avoids collision with Document thumbnails ({pk:07}.webp)
return (settings.THUMBNAIL_DIR / f"v{self.pk:07}.webp").resolve()
@property
def source_file(self):
return self.source_path.open("rb")
class Document(DocumentBase, SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-missing]
MAX_STORED_FILENAME_LENGTH: Final[int] = 1024
correspondent = models.ForeignKey(
@@ -187,15 +369,6 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
verbose_name=_("document type"),
)
content = models.TextField(
_("content"),
blank=True,
help_text=_(
"The raw, text-only data of the document. This field is "
"primarily used for searching.",
),
)
content_length = models.GeneratedField(
expression=Length("content"),
output_field=PositiveIntegerField(default=0),
@@ -205,8 +378,6 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
help_text="Length of the content field in characters. Automatically maintained by the database for faster statistics computation.",
)
mime_type = models.CharField(_("mime type"), max_length=256, editable=False)
tags = models.ManyToManyField(
Tag,
related_name="documents",
@@ -214,22 +385,6 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
verbose_name=_("tags"),
)
checksum = models.CharField(
_("checksum"),
max_length=64,
editable=False,
help_text=_("The checksum of the original document."),
)
archive_checksum = models.CharField(
_("archive checksum"),
max_length=64,
editable=False,
blank=True,
null=True,
help_text=_("The checksum of the archived document."),
)
page_count = models.PositiveIntegerField(
_("page count"),
blank=False,
@@ -255,13 +410,6 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
db_index=True,
)
added = models.DateTimeField(
_("added"),
default=timezone.now,
editable=False,
db_index=True,
)
filename = models.FilePathField(
_("filename"),
max_length=MAX_STORED_FILENAME_LENGTH,
@@ -310,45 +458,10 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
),
)
root_document = models.ForeignKey(
"self",
blank=True,
null=True,
related_name="versions",
on_delete=models.CASCADE,
verbose_name=_("root document for this version"),
)
version_index = models.PositiveIntegerField(
_("version index"),
blank=True,
null=True,
db_index=True,
help_text=_("Index of this version within the root document."),
)
version_label = models.CharField(
_("version label"),
max_length=64,
blank=True,
null=True,
help_text=_("Optional short label for a document version."),
)
class Meta:
ordering = ("-created",)
verbose_name = _("document")
verbose_name_plural = _("documents")
constraints = [
models.UniqueConstraint(
fields=["root_document", "version_index"],
condition=models.Q(
root_document__isnull=False,
version_index__isnull=False,
),
name="documents_document_root_version_index_uniq",
),
]
def __str__(self) -> str:
created = self.created.isoformat()
@@ -361,45 +474,6 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
res += f" {self.title}"
return res
def get_effective_content(self) -> str | None:
"""
Returns the effective content for the document.
For root documents, this is the latest version's content when available.
For version documents, this is always the document's own content.
If the queryset already annotated ``effective_content``, that value is used.
"""
if hasattr(self, "effective_content"):
return getattr(self, "effective_content")
if self.root_document_id is not None or self.pk is None:
return self.content
prefetched_cache = getattr(self, "_prefetched_objects_cache", None)
prefetched_versions = (
prefetched_cache.get("versions")
if isinstance(prefetched_cache, dict)
else None
)
if prefetched_versions is not None:
# Empty list means prefetch ran and found no versions — use own content.
if not prefetched_versions:
return self.content
latest_prefetched = max(prefetched_versions, key=lambda doc: doc.id)
return latest_prefetched.content
latest_version_content = (
Document.objects.filter(root_document=self)
.order_by("-id")
.values_list("content", flat=True)
.first()
)
return (
latest_version_content
if latest_version_content is not None
else self.content
)
@property
def suggestion_content(self):
"""
@@ -412,21 +486,12 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
This improves processing speed for large documents while keeping
enough context for accurate suggestions.
"""
effective_content = self.get_effective_content()
if not effective_content or len(effective_content) <= 1200000:
return effective_content
else:
# Use 80% from the start and 20% from the end
# to preserve both opening and closing context.
head_len = 800000
tail_len = 200000
return " ".join(
(
effective_content[:head_len],
effective_content[-tail_len:],
),
)
content = self.content
if not content or len(content) <= 1200000:
return content
head_len = 800000
tail_len = 200000
return " ".join((content[:head_len], content[-tail_len:]))
@property
def source_path(self) -> Path:
@@ -436,45 +501,7 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
@property
def source_file(self):
return Path(self.source_path).open("rb")
@property
def has_archive_version(self) -> bool:
return self.archive_filename is not None
@property
def archive_path(self) -> Path | None:
if self.has_archive_version:
return (settings.ARCHIVE_DIR / Path(str(self.archive_filename))).resolve()
else:
return None
@property
def archive_file(self):
return Path(self.archive_path).open("rb")
def get_public_filename(self, *, archive=False, counter=0, suffix=None) -> str:
"""
Returns a sanitized filename for the document, not including any paths.
"""
result = str(self)
if counter:
result += f"_{counter:02}"
if suffix:
result += suffix
if archive:
result += ".pdf"
else:
result += self.file_type
return pathvalidate.sanitize_filename(result, replacement_text="-")
@property
def file_type(self):
return get_default_file_extension(self.mime_type)
return self.source_path.open("rb")
@property
def thumbnail_path(self) -> Path:
@@ -484,10 +511,6 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
return webp_file_path.resolve()
@property
def thumbnail_file(self):
return Path(self.thumbnail_path).open("rb")
@property
def created_date(self):
return self.created
@@ -501,19 +524,6 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
tags_to_add = self.tags.model.objects.filter(id__in=tag_ids)
self.tags.add(*tags_to_add)
def delete(
self,
*args,
**kwargs,
):
# If deleting a root document, move all its versions to trash as well.
if self.root_document_id is None:
Document.objects.filter(root_document=self).delete()
return super().delete(
*args,
**kwargs,
)
class SavedView(ModelWithOwner):
class DisplayMode(models.TextChoices):

View File

@@ -184,7 +184,6 @@ class WriteBatch:
def add_or_update(
self,
document: Document,
effective_content: str | None = None,
) -> None:
"""
Add or update a document in the batch.
@@ -195,11 +194,9 @@ class WriteBatch:
Args:
document: Django Document instance to index
effective_content: Override document.content for indexing (used when
re-indexing with newer OCR text from document versions)
"""
self.remove(document.pk)
doc = self._backend._build_tantivy_doc(document, effective_content)
doc = self._backend._build_tantivy_doc(document)
self._writer.add_document(doc)
def remove(self, doc_id: int) -> None:
@@ -275,16 +272,9 @@ class TantivyBackend:
def _build_tantivy_doc(
self,
document: Document,
effective_content: str | None = None,
) -> tantivy.Document:
"""Build a tantivy Document from a Django Document instance.
``effective_content`` overrides ``document.content`` for indexing —
used when re-indexing a root document with a newer version's OCR text.
"""
content = (
effective_content if effective_content is not None else document.content
)
"""Build a tantivy Document from a Django Document instance."""
content = document.content
doc = tantivy.Document()
@@ -395,7 +385,6 @@ class TantivyBackend:
def add_or_update(
self,
document: Document,
effective_content: str | None = None,
) -> None:
"""
Add or update a single document with file locking.
@@ -405,11 +394,10 @@ class TantivyBackend:
Args:
document: Django Document instance to index
effective_content: Override document.content for indexing
"""
self._ensure_open()
with self.batch_update(lock_timeout=5.0) as batch:
batch.add_or_update(document, effective_content)
batch.add_or_update(document)
def remove(self, doc_id: int) -> None:
"""
@@ -805,10 +793,7 @@ class TantivyBackend:
try:
writer = new_index.writer()
for document in iter_wrapper(documents):
doc = self._build_tantivy_doc(
document,
document.get_effective_content(),
)
doc = self._build_tantivy_doc(document)
writer.add_document(doc)
writer.commit()
new_index.reload()

View File

@@ -92,8 +92,6 @@ if TYPE_CHECKING:
from collections.abc import Iterable
from django.db.models.query import QuerySet
from rest_framework.relations import ManyRelatedField
from rest_framework.relations import RelatedField
logger = logging.getLogger("paperless.serializers")
@@ -948,7 +946,6 @@ def _get_viewable_duplicates(
duplicates = Document.global_objects.filter(
Q(checksum__in=checksums) | Q(archive_checksum__in=checksums),
).exclude(pk=document.pk)
duplicates = duplicates.filter(root_document__isnull=True)
duplicates = duplicates.order_by("-created")
allowed = get_objects_for_user_owner_aware(
user,
@@ -966,11 +963,16 @@ class DuplicateDocumentSummarySerializer(serializers.Serializer):
class DocumentVersionInfoSerializer(serializers.Serializer):
id = serializers.IntegerField()
added = serializers.DateTimeField()
version_label = serializers.CharField(required=False, allow_null=True)
checksum = serializers.CharField(required=False, allow_null=True)
is_root = serializers.BooleanField()
id = serializers.IntegerField(read_only=True)
added = serializers.DateTimeField(read_only=True)
version_label = serializers.CharField(
required=False,
allow_null=True,
read_only=True,
)
checksum = serializers.CharField(required=False, allow_null=True, read_only=True)
version_number = serializers.IntegerField(read_only=True)
is_root = serializers.BooleanField(read_only=True)
class _DocumentVersionInfo(TypedDict):
@@ -978,6 +980,7 @@ class _DocumentVersionInfo(TypedDict):
added: datetime
version_label: str | None
checksum: str | None
version_number: int
is_root: bool
@@ -1001,9 +1004,6 @@ class DocumentSerializer(
duplicate_documents = SerializerMethodField()
notes = NotesSerializer(many=True, required=False, read_only=True)
root_document: RelatedField[Document, Document, Any] | ManyRelatedField = (
serializers.PrimaryKeyRelatedField(read_only=True)
)
versions = SerializerMethodField()
custom_fields = CustomFieldInstanceSerializer(
@@ -1039,41 +1039,41 @@ class DocumentSerializer(
return list(duplicates.values("id", "title", "deleted_at"))
@extend_schema_field(DocumentVersionInfoSerializer(many=True))
def get_versions(self, obj):
root_doc = obj if obj.root_document_id is None else obj.root_document
if root_doc is None:
return []
def get_versions(self, obj: Document):
prefetched_cache = getattr(obj, "_prefetched_objects_cache", None)
prefetched_versions = (
prefetched = (
prefetched_cache.get("versions")
if isinstance(prefetched_cache, dict)
else None
)
versions: list[Document]
if prefetched_versions is not None:
versions = [*prefetched_versions, root_doc]
if prefetched is not None:
versions: list = list(prefetched)
else:
versions_qs = Document.objects.filter(root_document=root_doc).only(
"id",
"added",
"checksum",
"version_label",
)
versions = [*versions_qs, root_doc]
from documents.models import DocumentVersion
def build_info(doc: Document) -> _DocumentVersionInfo:
versions = list(
DocumentVersion.objects.filter(document=obj).only(
"id",
"added",
"checksum",
"version_label",
"version_number",
),
)
def build_info(v: DocumentVersion) -> _DocumentVersionInfo:
return {
"id": doc.id,
"added": doc.added,
"version_label": doc.version_label,
"checksum": doc.checksum,
"is_root": doc.id == root_doc.id,
"id": v.id,
"added": v.added,
"version_label": v.version_label,
"checksum": v.checksum,
"version_number": v.version_number,
"is_root": v.version_number == 1,
}
info = [build_info(doc) for doc in versions]
info.sort(key=lambda item: item["id"], reverse=True)
info = [build_info(v) for v in versions]
info.sort(key=lambda item: item["version_number"], reverse=True)
return info
def get_original_file_name(self, obj) -> str | None:
@@ -1087,10 +1087,8 @@ class DocumentSerializer(
def to_representation(self, instance):
doc = super().to_representation(instance)
if "content" in self.fields and hasattr(instance, "effective_content"):
doc["content"] = getattr(instance, "effective_content") or ""
if self.truncate_content and "content" in self.fields:
doc["content"] = doc.get("content")[0:550]
doc["content"] = (doc.get("content") or "")[0:550]
return doc
def to_internal_value(self, data):
@@ -1250,7 +1248,6 @@ class DocumentSerializer(
"remove_inbox_tags",
"page_count",
"mime_type",
"root_document",
"versions",
)
list_serializer_class = OwnedObjectListSerializer

View File

@@ -654,16 +654,6 @@ def update_filename_and_move_files(
root=settings.ARCHIVE_DIR,
)
# Keep version files in sync with root
if instance.root_document_id is None:
for version_doc in Document.objects.filter(root_document_id=instance.pk).only(
"pk",
):
update_filename_and_move_files(
Document,
version_doc,
)
@shared_task
def process_cf_select_update(custom_field: CustomField) -> None:
@@ -792,10 +782,7 @@ def cleanup_user_deletion(sender, instance: User | Group, **kwargs) -> None:
def add_to_index(sender, document, **kwargs) -> None:
from documents.search import get_backend
get_backend().add_or_update(
document,
effective_content=document.get_effective_content(),
)
get_backend().add_or_update(document)
def run_workflows_added(
@@ -870,13 +857,6 @@ def run_workflows(
use_overrides = overrides is not None
if isinstance(document, Document) and document.root_document_id is not None:
logger.debug(
"Skipping workflow execution for version document %s",
document.pk,
)
return None
if original_file is None:
original_file = (
document.source_path if not use_overrides else document.original_file

View File

@@ -476,19 +476,16 @@ def check_scheduled_workflows() -> None:
match trigger.schedule_date_field:
case WorkflowTrigger.ScheduleDateField.ADDED:
documents = Document.objects.filter(
root_document__isnull=True,
added__lte=threshold,
)
case WorkflowTrigger.ScheduleDateField.CREATED:
documents = Document.objects.filter(
root_document__isnull=True,
created__lte=threshold,
)
case WorkflowTrigger.ScheduleDateField.MODIFIED:
documents = Document.objects.filter(
root_document__isnull=True,
modified__lte=threshold,
)
@@ -529,7 +526,6 @@ def check_scheduled_workflows() -> None:
]
documents = Document.objects.filter(
root_document__isnull=True,
id__in=matched_ids,
)

View File

@@ -10,6 +10,7 @@ from factory.django import DjangoModelFactory
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import DocumentVersion
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
@@ -65,3 +66,32 @@ class DocumentFactory(DjangoModelFactory):
correspondent = None
document_type = None
storage_path = None
@factory.post_generation
def with_version(self, create, extracted, **kwargs):
"""Create an initial DocumentVersion(version_number=1) matching the Document's fields."""
if not create or not extracted:
return
DocumentVersion.objects.create(
document=self,
version_number=1,
checksum=self.checksum or "default",
archive_checksum=self.archive_checksum,
content=self.content,
page_count=self.page_count,
mime_type=self.mime_type or "application/pdf",
original_filename=self.original_filename,
filename=self.filename,
archive_filename=self.archive_filename,
added=self.added,
)
class DocumentVersionFactory(DjangoModelFactory):
class Meta:
model = DocumentVersion
document = factory.SubFactory(DocumentFactory)
version_number = factory.Sequence(lambda n: n + 1)
checksum = factory.Faker("sha256")
mime_type = "application/pdf"

View File

@@ -6,8 +6,8 @@ pytestmark = pytest.mark.search
class TestMigrateFulltextQueryFieldPrefixes(TestMigrations):
migrate_from = "0016_sha256_checksums"
migrate_to = "0017_migrate_fulltext_query_field_prefixes"
migrate_from = "0015_sha256_checksums"
migrate_to = "0016_migrate_fulltext_query_field_prefixes"
def setUpBeforeMigration(self, apps) -> None:
User = apps.get_model("auth", "User")

View File

@@ -1,14 +1,12 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from unittest import TestCase
from unittest import mock
from auditlog.models import LogEntry # type: ignore[import-untyped]
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import FieldError
from django.core.files.uploadedfile import SimpleUploadedFile
from rest_framework import status
from rest_framework.test import APITestCase
@@ -17,6 +15,7 @@ from documents.data_models import DocumentSource
from documents.filters import EffectiveContentFilter
from documents.filters import TitleContentFilter
from documents.models import Document
from documents.models import DocumentVersion
from documents.tests.utils import DirectoriesMixin
if TYPE_CHECKING:
@@ -41,79 +40,50 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(content)
def _create_pdf(
self,
*,
title: str,
checksum: str,
root_document: Document | None = None,
) -> Document:
def _create_doc(self, *, title: str, checksum: str) -> Document:
doc = Document.objects.create(
title=title,
checksum=checksum,
mime_type="application/pdf",
root_document=root_document,
)
self._write_file(doc.source_path, b"pdf")
self._write_file(doc.thumbnail_path, b"thumb")
return doc
def test_root_endpoint_returns_root_for_version_and_root(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
)
resp_root = self.client.get(f"/api/documents/{root.id}/root/")
self.assertEqual(resp_root.status_code, status.HTTP_200_OK)
self.assertEqual(resp_root.data["root_id"], root.id)
resp_version = self.client.get(f"/api/documents/{version.id}/root/")
self.assertEqual(resp_version.status_code, status.HTTP_200_OK)
self.assertEqual(resp_version.data["root_id"], root.id)
def test_root_endpoint_returns_404_for_missing_document(self) -> None:
resp = self.client.get("/api/documents/9999/root/")
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
def test_root_endpoint_returns_403_when_user_lacks_permission(self) -> None:
owner = User.objects.create_user(username="owner")
viewer = User.objects.create_user(username="viewer")
viewer.user_permissions.add(
Permission.objects.get(codename="view_document"),
)
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
owner=owner,
)
self.client.force_authenticate(user=viewer)
resp = self.client.get(f"/api/documents/{root.id}/root/")
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
def test_delete_version_disallows_deleting_root(self) -> None:
def _create_version(
self,
doc: Document,
*,
version_number: int,
checksum: str,
) -> DocumentVersion:
v = DocumentVersion.objects.create(
document=doc,
version_number=version_number,
checksum=checksum,
mime_type="application/pdf",
filename=f"version_{doc.pk}_v{version_number}.pdf",
)
self._write_file(v.source_path, b"pdf")
self._write_file(v.thumbnail_path, b"thumb")
return v
def test_delete_version_disallows_deleting_last_version(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
version = self._create_version(root, version_number=1, checksum="v1")
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(f"/api/documents/{root.id}/versions/{root.id}/")
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{version.pk}/",
)
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
self.assertTrue(Document.objects.filter(id=root.id).exists())
self.assertIn("only remaining version", resp.content.decode())
self.assertTrue(DocumentVersion.objects.filter(pk=version.pk).exists())
def test_delete_version_deletes_version_and_returns_current_version(self) -> None:
root = Document.objects.create(
@@ -122,38 +92,31 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
mime_type="application/pdf",
content="root-content",
)
v1 = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="v1-content",
)
v2 = Document.objects.create(
title="v2",
checksum="v2",
mime_type="application/pdf",
root_document=root,
content="v2-content",
)
v1 = self._create_version(root, version_number=1, checksum="v1")
v1.content = "v1-content"
v1.save(update_fields=["content"])
v2 = self._create_version(root, version_number=2, checksum="v2")
v2.content = "v2-content"
v2.save(update_fields=["content"])
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(f"/api/documents/{root.id}/versions/{v2.id}/")
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{v2.pk}/",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertFalse(Document.objects.filter(id=v2.id).exists())
self.assertEqual(resp.data["current_version_id"], v1.id)
self.assertFalse(DocumentVersion.objects.filter(pk=v2.pk).exists())
self.assertEqual(resp.data["current_version_id"], v1.pk)
root.refresh_from_db()
self.assertEqual(root.content, "root-content")
self.assertEqual(root.content, "v1-content")
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(f"/api/documents/{root.id}/versions/{v1.id}/")
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{v1.pk}/",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertFalse(Document.objects.filter(id=v1.id).exists())
self.assertEqual(resp.data["current_version_id"], root.id)
root.refresh_from_db()
self.assertEqual(root.content, "root-content")
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("only remaining version", resp.content.decode())
def test_delete_version_writes_audit_log_entry(self) -> None:
root = Document.objects.create(
@@ -161,17 +124,13 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
)
version_id = version.id
self._create_version(root, version_number=1, checksum="v1")
v2 = self._create_version(root, version_number=2, checksum="v2")
version_pk = v2.pk
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{version_id}/",
f"/api/documents/{root.id}/versions/{version_pk}/",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@@ -193,10 +152,10 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
self.assertEqual(entry.action, LogEntry.Action.UPDATE)
self.assertEqual(
entry.changes,
{"Version Deleted": ["None", version_id]},
{"Version Deleted": ["None", version_pk]},
)
additional_data = entry.additional_data or {}
self.assertEqual(additional_data.get("version_id"), version_id)
self.assertEqual(additional_data.get("version_id"), version_pk)
def test_delete_version_returns_404_when_version_not_related(self) -> None:
root = Document.objects.create(
@@ -209,42 +168,20 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
checksum="other",
mime_type="application/pdf",
)
other_version = Document.objects.create(
title="other-v1",
other_v1 = self._create_version(
other_root,
version_number=1,
checksum="other-v1",
mime_type="application/pdf",
root_document=other_root,
)
self._create_version(other_root, version_number=2, checksum="other-v2")
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{other_version.id}/",
f"/api/documents/{root.id}/versions/{other_v1.pk}/",
)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
def test_delete_version_accepts_version_id_as_root_parameter(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
)
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(
f"/api/documents/{version.id}/versions/{version.id}/",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertFalse(Document.objects.filter(id=version.id).exists())
self.assertEqual(resp.data["current_version_id"], root.id)
def test_delete_version_returns_404_when_root_missing(self) -> None:
resp = self.client.delete("/api/documents/9999/versions/123/")
@@ -256,22 +193,17 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
)
self._create_version(root, version_number=1, checksum="v1")
v2 = self._create_version(root, version_number=2, checksum="v2")
with mock.patch("documents.search.get_backend") as mock_get_backend:
mock_backend = mock.MagicMock()
mock_get_backend.return_value = mock_backend
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{version.id}/",
f"/api/documents/{root.id}/versions/{v2.pk}/",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
mock_backend.remove.assert_called_once_with(version.pk)
mock_backend.add_or_update.assert_called_once()
self.assertEqual(mock_backend.add_or_update.call_args[0][0].id, root.id)
@@ -287,16 +219,11 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
mime_type="application/pdf",
owner=owner,
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
)
version = self._create_version(root, version_number=1, checksum="v1")
self.client.force_authenticate(user=other)
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{version.id}/",
f"/api/documents/{root.id}/versions/{version.pk}/",
)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
@@ -318,16 +245,14 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
version_label="old",
)
# version_number=1 is considered the root version; use 2 for a non-root.
self._create_version(root, version_number=1, checksum="v1")
version = self._create_version(root, version_number=2, checksum="v2")
version.version_label = "old"
version.save(update_fields=["version_label"])
resp = self.client.patch(
f"/api/documents/{root.id}/versions/{version.id}/",
f"/api/documents/{root.id}/versions/{version.pk}/",
{"version_label": " Label 1 "},
format="json",
)
@@ -336,7 +261,7 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
version.refresh_from_db()
self.assertEqual(version.version_label, "Label 1")
self.assertEqual(resp.data["version_label"], "Label 1")
self.assertEqual(resp.data["id"], version.id)
self.assertEqual(resp.data["id"], version.pk)
self.assertFalse(resp.data["is_root"])
def test_update_version_label_clears_on_blank(self) -> None:
@@ -344,18 +269,20 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
title="root",
checksum="root",
mime_type="application/pdf",
version_label="Root Label",
)
version = self._create_version(root, version_number=1, checksum="v1")
version.version_label = "Root Label"
version.save(update_fields=["version_label"])
resp = self.client.patch(
f"/api/documents/{root.id}/versions/{root.id}/",
f"/api/documents/{root.id}/versions/{version.pk}/",
{"version_label": " "},
format="json",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
root.refresh_from_db()
self.assertIsNone(root.version_label)
version.refresh_from_db()
self.assertIsNone(version.version_label)
self.assertIsNone(resp.data["version_label"])
self.assertTrue(resp.data["is_root"])
@@ -371,16 +298,11 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
mime_type="application/pdf",
owner=owner,
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
)
version = self._create_version(root, version_number=1, checksum="v1")
self.client.force_authenticate(user=other)
resp = self.client.patch(
f"/api/documents/{root.id}/versions/{version.id}/",
f"/api/documents/{root.id}/versions/{version.pk}/",
{"version_label": "Blocked"},
format="json",
)
@@ -398,15 +320,14 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
checksum="other",
mime_type="application/pdf",
)
other_version = Document.objects.create(
title="other-v1",
other_version = self._create_version(
other_root,
version_number=1,
checksum="other-v1",
mime_type="application/pdf",
root_document=other_root,
)
resp = self.client.patch(
f"/api/documents/{root.id}/versions/{other_version.id}/",
f"/api/documents/{root.id}/versions/{other_version.pk}/",
{"version_label": "Nope"},
format="json",
)
@@ -414,7 +335,7 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
def test_download_version_param_errors(self) -> None:
root = self._create_pdf(title="root", checksum="root")
root = self._create_doc(title="root", checksum="root")
resp = self.client.get(
f"/api/documents/{root.id}/download/?version=not-a-number",
@@ -424,41 +345,37 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
resp = self.client.get(f"/api/documents/{root.id}/download/?version=9999")
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
other_root = self._create_pdf(title="other", checksum="other")
other_version = self._create_pdf(
title="other-v1",
other_root = self._create_doc(title="other", checksum="other")
other_version = self._create_version(
other_root,
version_number=1,
checksum="other-v1",
root_document=other_root,
)
resp = self.client.get(
f"/api/documents/{root.id}/download/?version={other_version.id}",
f"/api/documents/{root.id}/download/?version={other_version.pk}",
)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
def test_download_preview_thumb_with_version_param(self) -> None:
root = self._create_pdf(title="root", checksum="root")
version = self._create_pdf(
title="v1",
checksum="v1",
root_document=root,
)
root = self._create_doc(title="root", checksum="root")
version = self._create_version(root, version_number=1, checksum="v1")
self._write_file(version.source_path, b"version")
self._write_file(version.thumbnail_path, b"thumb")
resp = self.client.get(
f"/api/documents/{root.id}/download/?version={version.id}",
f"/api/documents/{root.id}/download/?version={version.pk}",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.content, b"version")
resp = self.client.get(
f"/api/documents/{root.id}/preview/?version={version.id}",
f"/api/documents/{root.id}/preview/?version={version.pk}",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.content, b"version")
resp = self.client.get(
f"/api/documents/{root.id}/thumb/?version={version.id}",
f"/api/documents/{root.id}/thumb/?version={version.pk}",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.content, b"thumb")
@@ -469,24 +386,19 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
)
version = self._create_version(root, version_number=1, checksum="v1")
with mock.patch("documents.views.DocumentViewSet.get_metadata") as metadata:
metadata.return_value = []
resp = self.client.get(
f"/api/documents/{root.id}/metadata/?version={version.id}",
f"/api/documents/{root.id}/metadata/?version={version.pk}",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertTrue(metadata.called)
def test_metadata_version_param_errors(self) -> None:
root = self._create_pdf(title="root", checksum="root")
root = self._create_doc(title="root", checksum="root")
resp = self.client.get(
f"/api/documents/{root.id}/metadata/?version=not-a-number",
@@ -496,14 +408,14 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
resp = self.client.get(f"/api/documents/{root.id}/metadata/?version=9999")
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
other_root = self._create_pdf(title="other", checksum="other")
other_version = self._create_pdf(
title="other-v1",
other_root = self._create_doc(title="other", checksum="other")
other_version = self._create_version(
other_root,
version_number=1,
checksum="other-v1",
root_document=other_root,
)
resp = self.client.get(
f"/api/documents/{root.id}/metadata/?version={other_version.id}",
f"/api/documents/{root.id}/metadata/?version={other_version.pk}",
)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
@@ -553,39 +465,6 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
self.assertEqual(overrides.version_label, "New Version")
self.assertEqual(overrides.actor_id, self.user.id)
def test_update_version_with_version_pk_normalizes_to_root(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
)
upload = self._make_pdf_upload()
async_task = mock.Mock()
async_task.id = "task-123"
with mock.patch("documents.views.consume_file") as consume_mock:
consume_mock.delay.return_value = async_task
resp = self.client.post(
f"/api/documents/{version.id}/update_version/",
{"document": upload, "version_label": " New Version "},
format="multipart",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data, "task-123")
consume_mock.delay.assert_called_once()
input_doc, overrides = consume_mock.delay.call_args[0]
self.assertEqual(input_doc.root_document_id, root.id)
self.assertEqual(overrides.version_label, "New Version")
self.assertEqual(overrides.actor_id, self.user.id)
def test_update_version_returns_500_on_consume_failure(self) -> None:
root = Document.objects.create(
title="root",
@@ -654,34 +533,28 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
mime_type="application/pdf",
content="root-content",
)
v1 = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="v1-content",
)
v2 = Document.objects.create(
title="v2",
checksum="v2",
mime_type="application/pdf",
root_document=root,
content="v2-content",
)
v1 = self._create_version(root, version_number=1, checksum="v1")
v1.content = "v1-content"
v1.save(update_fields=["content"])
v2 = self._create_version(root, version_number=2, checksum="v2")
v2.content = "v2-content"
v2.save(update_fields=["content"])
resp = self.client.patch(
f"/api/documents/{root.id}/",
{"content": "edited-content"},
format="json",
)
with mock.patch("documents.search.get_backend"):
resp = self.client.patch(
f"/api/documents/{root.id}/",
{"content": "edited-content"},
format="json",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data["content"], "edited-content")
root.refresh_from_db()
v1.refresh_from_db()
v2.refresh_from_db()
root.refresh_from_db()
# The latest version (v2) and the Document cache are both updated.
self.assertEqual(v2.content, "edited-content")
self.assertEqual(root.content, "root-content")
self.assertEqual(root.content, "edited-content")
self.assertEqual(v1.content, "v1-content")
def test_patch_content_updates_selected_version_content(self) -> None:
@@ -691,55 +564,41 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
mime_type="application/pdf",
content="root-content",
)
v1 = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="v1-content",
)
v2 = Document.objects.create(
title="v2",
checksum="v2",
mime_type="application/pdf",
root_document=root,
content="v2-content",
)
v1 = self._create_version(root, version_number=1, checksum="v1")
v1.content = "v1-content"
v1.save(update_fields=["content"])
v2 = self._create_version(root, version_number=2, checksum="v2")
v2.content = "v2-content"
v2.save(update_fields=["content"])
resp = self.client.patch(
f"/api/documents/{root.id}/?version={v1.id}",
{"content": "edited-v1"},
format="json",
)
with mock.patch("documents.search.get_backend"):
resp = self.client.patch(
f"/api/documents/{root.id}/?version={v1.pk}",
{"content": "edited-v1"},
format="json",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data["content"], "edited-v1")
root.refresh_from_db()
v1.refresh_from_db()
v2.refresh_from_db()
root.refresh_from_db()
self.assertEqual(v1.content, "edited-v1")
self.assertEqual(v2.content, "v2-content")
self.assertEqual(root.content, "root-content")
def test_retrieve_returns_latest_version_content(self) -> None:
def test_retrieve_returns_document_content(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
content="root-content",
)
Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="v1-content",
content="latest-content",
)
resp = self.client.get(f"/api/documents/{root.id}/")
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data["content"], "v1-content")
self.assertEqual(resp.data["content"], "latest-content")
def test_retrieve_with_version_param_returns_selected_version_content(self) -> None:
root = Document.objects.create(
@@ -748,51 +607,37 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
mime_type="application/pdf",
content="root-content",
)
v1 = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="v1-content",
)
v1 = self._create_version(root, version_number=1, checksum="v1")
v1.content = "v1-content"
v1.save(update_fields=["content"])
resp = self.client.get(f"/api/documents/{root.id}/?version={v1.id}")
resp = self.client.get(f"/api/documents/{root.id}/?version={v1.pk}")
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data["content"], "v1-content")
class TestVersionAwareFilters(TestCase):
def test_title_content_filter_falls_back_to_content(self) -> None:
class TestVersionAwareFilters:
def test_title_content_filter_queries_content_directly(self) -> None:
queryset = mock.Mock()
fallback_queryset = mock.Mock()
queryset.filter.side_effect = [FieldError("missing field"), fallback_queryset]
result = TitleContentFilter().filter(queryset, " latest ")
TitleContentFilter().filter(queryset, " latest ")
self.assertIs(result, fallback_queryset)
self.assertEqual(queryset.filter.call_count, 2)
assert queryset.filter.call_count == 1
def test_effective_content_filter_falls_back_to_content_lookup(self) -> None:
def test_effective_content_filter_queries_content_directly(self) -> None:
queryset = mock.Mock()
fallback_queryset = mock.Mock()
queryset.filter.side_effect = [FieldError("missing field"), fallback_queryset]
result = EffectiveContentFilter(lookup_expr="icontains").filter(
queryset,
" latest ",
)
EffectiveContentFilter(lookup_expr="icontains").filter(queryset, " latest ")
self.assertIs(result, fallback_queryset)
first_kwargs = queryset.filter.call_args_list[0].kwargs
second_kwargs = queryset.filter.call_args_list[1].kwargs
self.assertEqual(first_kwargs, {"effective_content__icontains": "latest"})
self.assertEqual(second_kwargs, {"content__icontains": "latest"})
assert queryset.filter.call_count == 1
kwargs = queryset.filter.call_args_list[0].kwargs
assert kwargs == {"content__icontains": "latest"}
def test_effective_content_filter_returns_input_for_empty_values(self) -> None:
queryset = mock.Mock()
result = EffectiveContentFilter(lookup_expr="icontains").filter(queryset, " ")
self.assertIs(result, queryset)
assert result is queryset
queryset.filter.assert_not_called()

View File

@@ -36,6 +36,7 @@ from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import DocumentVersion
from documents.models import MatchingModel
from documents.models import Note
from documents.models import SavedView
@@ -316,10 +317,17 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
filename=Path(filename).name,
mime_type="application/pdf",
)
dv = DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum=doc.checksum or "default",
mime_type=doc.mime_type,
filename=doc.filename,
)
if TYPE_CHECKING:
assert isinstance(self.dirs.thumbnail_dir, Path), self.dirs.thumbnail_dir
with (self.dirs.thumbnail_dir / f"{doc.pk:07d}.webp").open("wb") as f:
with dv.thumbnail_path.open("wb") as f:
f.write(content_thumbnail)
response = self.client.get(f"/api/documents/{doc.pk}/download/")
@@ -369,8 +377,15 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
mime_type="application/pdf",
owner=user1,
)
dv = DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum=doc.checksum or "default",
mime_type=doc.mime_type,
filename=doc.filename,
)
with (Path(self.dirs.thumbnail_dir) / f"{doc.pk:07d}.webp").open("wb") as f:
with dv.thumbnail_path.open("wb") as f:
f.write(content_thumbnail)
response = self.client.get(f"/api/documents/{doc.pk}/download/")
@@ -404,6 +419,14 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
archive_filename="archived.pdf",
mime_type="application/pdf",
)
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum=doc.checksum or "default",
mime_type=doc.mime_type,
filename=doc.filename,
archive_filename=doc.archive_filename,
)
with Path(doc.source_path).open("wb") as f:
f.write(content)
@@ -446,6 +469,14 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
archive_filename="archived.pdf",
mime_type="application/pdf",
)
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum=doc.checksum or "default",
mime_type=doc.mime_type,
filename=doc.filename,
archive_filename=doc.archive_filename,
)
with Path(doc.source_path).open("wb") as f:
f.write(content)
@@ -585,16 +616,21 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
mime_type="application/pdf",
owner=self.user,
)
version_doc = Document.objects.create(
title="Version",
DocumentVersion.objects.create(
document=root_doc,
version_number=1,
checksum="123",
mime_type="application/pdf",
)
v2 = DocumentVersion.objects.create(
document=root_doc,
version_number=2,
checksum="456",
mime_type="application/pdf",
root_document=root_doc,
owner=self.user,
)
response = self.client.delete(
f"/api/documents/{root_doc.pk}/versions/{version_doc.pk}/",
f"/api/documents/{root_doc.pk}/versions/{v2.pk}/",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
@@ -605,7 +641,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
self.assertEqual(response.data[0]["action"], "update")
self.assertEqual(
response.data[0]["changes"],
{"Version Deleted": ["None", version_doc.pk]},
{"Version Deleted": ["None", v2.pk]},
)
@override_settings(AUDIT_LOG_ENABLED=False)
@@ -1452,17 +1488,24 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
)
def test_document_filters_use_latest_version_content(self) -> None:
root = Document.objects.create(
doc = Document.objects.create(
title="versioned root",
checksum="root",
checksum="v2",
mime_type="application/pdf",
content="root-content",
content="latest-version-content",
)
version = Document.objects.create(
title="versioned root",
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="old-content",
)
DocumentVersion.objects.create(
document=doc,
version_number=2,
checksum="v2",
mime_type="application/pdf",
content="latest-version-content",
)
@@ -1472,8 +1515,8 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["id"], root.id)
self.assertEqual(results[0]["content"], version.content)
self.assertEqual(results[0]["id"], doc.id)
self.assertEqual(results[0]["content"], "latest-version-content")
response = self.client.get(
"/api/documents/?title_content=latest-version-content",
@@ -1481,7 +1524,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["id"], root.id)
self.assertEqual(results[0]["id"], doc.id)
def test_create_wrong_endpoint(self) -> None:
response = self.client.post(
@@ -2042,6 +2085,15 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
archive_checksum="A",
archive_filename="archive.pdf",
)
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum=doc.checksum or "default",
mime_type=doc.mime_type,
filename=doc.filename,
archive_filename=doc.archive_filename,
archive_checksum=doc.archive_checksum,
)
source_file: Path = (
Path(__file__).parent
@@ -2082,6 +2134,13 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
filename="file.pdf",
mime_type="application/pdf",
)
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum=doc.checksum or "default",
mime_type=doc.mime_type,
filename=doc.filename,
)
shutil.copy(Path(__file__).parent / "samples" / "simple.pdf", doc.source_path)
@@ -2105,6 +2164,15 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
archive_checksum="B",
checksum="A",
)
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum=doc.checksum,
mime_type=doc.mime_type,
filename=doc.filename,
archive_filename=doc.archive_filename,
archive_checksum=doc.archive_checksum,
)
response = self.client.get(f"/api/documents/{doc.pk}/metadata/")
self.assertEqual(response.status_code, status.HTTP_200_OK)

View File

@@ -1,474 +0,0 @@
from unittest import mock
from allauth.mfa.models import Authenticator
from allauth.socialaccount.models import SocialAccount
from allauth.socialaccount.models import SocialApp
from django.contrib.auth.models import User
from rest_framework import status
from rest_framework.authtoken.models import Token
from rest_framework.test import APITestCase
from documents.tests.utils import DirectoriesMixin
# see allauth.socialaccount.providers.openid.provider.OpenIDProvider
class MockOpenIDProvider:
id = "openid"
name = "OpenID"
def get_brands(self):
default_servers = [
dict(id="yahoo", name="Yahoo", openid_url="http://me.yahoo.com"),
dict(id="hyves", name="Hyves", openid_url="http://hyves.nl"),
]
return default_servers
def get_login_url(self, request, **kwargs):
return "openid/login/"
# see allauth.socialaccount.providers.openid_connect.provider.OpenIDConnectProviderAccount
class MockOpenIDConnectProviderAccount:
def __init__(self, mock_social_account_dict) -> None:
self.account = mock_social_account_dict
def to_str(self):
return self.account["name"]
# see allauth.socialaccount.providers.openid_connect.provider.OpenIDConnectProvider
class MockOpenIDConnectProvider:
id = "openid_connect"
name = "OpenID Connect"
def __init__(self, app=None) -> None:
self.app = app
self.name = app.name
def get_login_url(self, request, **kwargs):
return f"{self.app.provider_id}/login/?process=connect"
class TestApiProfile(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/profile/"
def setUp(self) -> None:
super().setUp()
self.user = User.objects.create_superuser(
username="temp_admin",
first_name="firstname",
last_name="surname",
)
self.client.force_authenticate(user=self.user)
def setupSocialAccount(self) -> None:
SocialApp.objects.create(
name="Keycloak",
provider="openid_connect",
provider_id="keycloak-test",
)
self.user.socialaccount_set.add(
SocialAccount(uid="123456789", provider="keycloak-test"),
bulk=False,
)
def test_get_profile(self) -> None:
"""
GIVEN:
- Configured user
WHEN:
- API call is made to get profile
THEN:
- Profile is returned
"""
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["email"], self.user.email)
self.assertEqual(response.data["first_name"], self.user.first_name)
self.assertEqual(response.data["last_name"], self.user.last_name)
@mock.patch(
"allauth.socialaccount.models.SocialAccount.get_provider_account",
)
@mock.patch(
"allauth.socialaccount.adapter.DefaultSocialAccountAdapter.list_providers",
)
def test_get_profile_w_social(
self,
mock_list_providers,
mock_get_provider_account,
) -> None:
"""
GIVEN:
- Configured user and setup social account
WHEN:
- API call is made to get profile
THEN:
- Profile is returned with social accounts
"""
self.setupSocialAccount()
openid_provider = (
MockOpenIDConnectProvider(
app=SocialApp.objects.get(provider_id="keycloak-test"),
),
)
mock_list_providers.return_value = [
openid_provider,
]
mock_get_provider_account.return_value = MockOpenIDConnectProviderAccount(
mock_social_account_dict={
"name": openid_provider[0].name,
},
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.data["social_accounts"],
[
{
"id": 1,
"provider": "keycloak-test",
"name": "Keycloak",
},
],
)
def test_profile_w_social_removed_app(self) -> None:
"""
GIVEN:
- Configured user and setup social account
- Social app has been removed
WHEN:
- API call is made to get profile
THEN:
- Profile is returned with "Unknown App" as name
"""
self.setupSocialAccount()
# Remove the social app
SocialApp.objects.get(provider_id="keycloak-test").delete()
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.data["social_accounts"],
[
{
"id": 1,
"provider": "keycloak-test",
"name": "Unknown App",
},
],
)
def test_update_profile(self) -> None:
"""
GIVEN:
- Configured user
WHEN:
- API call is made to update profile
THEN:
- Profile is updated
"""
user_data = {
"email": "new@email.com",
"password": "superpassword1234",
"first_name": "new first name",
"last_name": "new last name",
}
response = self.client.patch(self.ENDPOINT, user_data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
user = User.objects.get(username=self.user.username)
self.assertTrue(user.check_password(user_data["password"]))
self.assertEqual(user.email, user_data["email"])
self.assertEqual(user.first_name, user_data["first_name"])
self.assertEqual(user.last_name, user_data["last_name"])
def test_update_profile_invalid_password_returns_field_error(self) -> None:
"""
GIVEN:
- Configured user
WHEN:
- API call is made to update profile with weak password
THEN:
- Profile update fails with password field error
"""
user_data = {
"email": "new@email.com",
"password": "short", # shorter than default validator threshold
"first_name": "new first name",
"last_name": "new last name",
}
response = self.client.patch(self.ENDPOINT, user_data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", response.data)
self.assertIsInstance(response.data["password"], list)
self.assertTrue(
any(
"too short" in message.lower() for message in response.data["password"]
),
)
def test_update_profile_placeholder_password_skips_validation(self) -> None:
"""
GIVEN:
- Configured user with existing password
WHEN:
- API call is made with the obfuscated placeholder password value
THEN:
- Profile is updated without changing the password or running validators
"""
original_password = "orig-pass-12345"
self.user.set_password(original_password)
self.user.save()
user_data = {
"email": "new@email.com",
"password": "*" * 12, # matches obfuscated value from serializer
"first_name": "new first name",
"last_name": "new last name",
}
response = self.client.patch(self.ENDPOINT, user_data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
user = User.objects.get(username=self.user.username)
self.assertTrue(user.check_password(original_password))
self.assertEqual(user.email, user_data["email"])
self.assertEqual(user.first_name, user_data["first_name"])
self.assertEqual(user.last_name, user_data["last_name"])
def test_update_auth_token(self) -> None:
"""
GIVEN:
- Configured user
WHEN:
- API call is made to generate auth token
THEN:
- Token is created the first time, updated the second
"""
self.assertEqual(len(Token.objects.all()), 0)
response = self.client.post(f"{self.ENDPOINT}generate_auth_token/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
token1 = Token.objects.filter(user=self.user).first()
self.assertIsNotNone(token1)
response = self.client.post(f"{self.ENDPOINT}generate_auth_token/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
token2 = Token.objects.filter(user=self.user).first()
self.assertNotEqual(token1.key, token2.key)
def test_profile_not_logged_in(self) -> None:
"""
GIVEN:
- User not logged in
WHEN:
- API call is made to get profile and update token
THEN:
- Profile is returned
"""
self.client.logout()
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
response = self.client.post(f"{self.ENDPOINT}generate_auth_token/")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
@mock.patch(
"allauth.socialaccount.adapter.DefaultSocialAccountAdapter.list_providers",
)
def test_get_social_account_providers(
self,
mock_list_providers,
) -> None:
"""
GIVEN:
- Configured user
WHEN:
- API call is made to get social account providers
THEN:
- Social account providers are returned
"""
self.setupSocialAccount()
mock_list_providers.return_value = [
MockOpenIDConnectProvider(
app=SocialApp.objects.get(provider_id="keycloak-test"),
),
]
response = self.client.get(f"{self.ENDPOINT}social_account_providers/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.data[0]["name"],
"Keycloak",
)
self.assertIn(
"keycloak-test/login/?process=connect",
response.data[0]["login_url"],
)
@mock.patch(
"allauth.socialaccount.adapter.DefaultSocialAccountAdapter.list_providers",
)
def test_get_social_account_providers_openid(
self,
mock_list_providers,
) -> None:
"""
GIVEN:
- Configured user and openid social account provider
WHEN:
- API call is made to get social account providers
THEN:
- Brands for openid provider are returned
"""
mock_list_providers.return_value = [
MockOpenIDProvider(),
]
response = self.client.get(f"{self.ENDPOINT}social_account_providers/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
len(response.data),
2,
)
def test_disconnect_social_account(self) -> None:
"""
GIVEN:
- Configured user
WHEN:
- API call is made to disconnect a social account
THEN:
- Social account is deleted from the user or request fails
"""
self.setupSocialAccount()
# Test with invalid id
response = self.client.post(
f"{self.ENDPOINT}disconnect_social_account/",
{"id": -1},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# Test with valid id
social_account_id = self.user.socialaccount_set.all()[0].pk
response = self.client.post(
f"{self.ENDPOINT}disconnect_social_account/",
{"id": social_account_id},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, social_account_id)
self.assertEqual(
len(self.user.socialaccount_set.filter(pk=social_account_id)),
0,
)
class TestApiTOTPViews(APITestCase):
ENDPOINT = "/api/profile/totp/"
def setUp(self) -> None:
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def test_get_totp(self) -> None:
"""
GIVEN:
- Existing user account
WHEN:
- API request is made to TOTP endpoint
THEN:
- TOTP is generated
"""
response = self.client.get(
self.ENDPOINT,
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("qr_svg", response.data)
self.assertIn("secret", response.data)
@mock.patch("allauth.mfa.totp.internal.auth.validate_totp_code")
def test_activate_totp(self, mock_validate_totp_code) -> None:
"""
GIVEN:
- Existing user account
WHEN:
- API request is made to activate TOTP
THEN:
- TOTP is activated, recovery codes are returned
"""
mock_validate_totp_code.return_value = True
response = self.client.post(
self.ENDPOINT,
data={
"secret": "123",
"code": "456",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(Authenticator.objects.filter(user=self.user).exists())
self.assertIn("recovery_codes", response.data)
def test_deactivate_totp(self) -> None:
"""
GIVEN:
- Existing user account with TOTP enabled
WHEN:
- API request is made to deactivate TOTP
THEN:
- TOTP is deactivated
"""
Authenticator.objects.create(
user=self.user,
type=Authenticator.Type.TOTP,
data={},
)
response = self.client.delete(
self.ENDPOINT,
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(Authenticator.objects.filter(user=self.user).count(), 0)
# test fails
response = self.client.delete(
self.ENDPOINT,
)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

View File

@@ -381,52 +381,20 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
[self.doc3.id, self.doc4.id, self.doc5.id],
)
def test_delete_root_document_deletes_all_versions(self) -> None:
version = Document.objects.create(
checksum="A-v1",
title="A version",
root_document=self.doc1,
def test_delete_document_deletes_document_versions_via_cascade(self) -> None:
from documents.models import DocumentVersion
v1 = DocumentVersion.objects.create(
document=self.doc1,
version_number=1,
checksum="A",
mime_type="application/pdf",
)
bulk_edit.delete([self.doc1.id])
self.assertFalse(Document.objects.filter(id=self.doc1.id).exists())
self.assertFalse(Document.objects.filter(id=version.id).exists())
def test_delete_version_document_keeps_root(self) -> None:
version = Document.objects.create(
checksum="A-v1",
title="A version",
root_document=self.doc1,
)
bulk_edit.delete([version.id])
self.assertTrue(Document.objects.filter(id=self.doc1.id).exists())
self.assertFalse(Document.objects.filter(id=version.id).exists())
def test_resolve_root_and_source_doc_latest_version_prefers_newest_version(
self,
) -> None:
version1 = Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
)
version2 = Document.objects.create(
checksum="B-v2",
title="B version 2",
root_document=self.doc2,
)
root_doc, source_doc = bulk_edit._resolve_root_and_source_doc(
self.doc2,
source_mode="latest_version",
)
self.assertEqual(root_doc.id, self.doc2.id)
self.assertEqual(source_doc.id, version2.id)
self.assertNotEqual(source_doc.id, version1.id)
self.assertFalse(DocumentVersion.objects.filter(id=v1.id).exists())
@mock.patch("documents.tasks.bulk_update_documents.delay")
def test_set_permissions(self, m) -> None:
@@ -662,20 +630,11 @@ class TestPDFActions(DirectoriesMixin, TestCase):
@mock.patch("pikepdf.open")
@mock.patch("documents.tasks.consume_file.s")
def test_merge_uses_latest_version_source_for_root_selection(
def test_merge_uses_document_source_path(
self,
mock_consume_file,
mock_open_pdf,
) -> None:
version_file = self.dirs.scratch_dir / "sample2_version_merge.pdf"
shutil.copy(self.doc2.source_path, version_file)
version = Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
filename=version_file,
mime_type="application/pdf",
)
fake_pdf = mock.MagicMock()
fake_pdf.pdf_version = "1.7"
fake_pdf.pages = [mock.Mock()]
@@ -684,7 +643,7 @@ class TestPDFActions(DirectoriesMixin, TestCase):
result = bulk_edit.merge([self.doc2.id])
self.assertEqual(result, "OK")
mock_open_pdf.assert_called_once_with(str(version.source_path))
mock_open_pdf.assert_called_once_with(str(self.doc2.source_path))
mock_consume_file.assert_not_called()
@mock.patch("documents.bulk_edit.delete.si")
@@ -898,21 +857,12 @@ class TestPDFActions(DirectoriesMixin, TestCase):
@mock.patch("documents.bulk_edit.group")
@mock.patch("pikepdf.open")
@mock.patch("documents.tasks.consume_file.s")
def test_split_uses_latest_version_source_for_root_selection(
def test_split_uses_document_source_path(
self,
mock_consume_file,
mock_open_pdf,
mock_group,
) -> None:
version_file = self.dirs.scratch_dir / "sample2_version_split.pdf"
shutil.copy(self.doc2.source_path, version_file)
version = Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
filename=version_file,
mime_type="application/pdf",
)
fake_pdf = mock.MagicMock()
fake_pdf.pages = [mock.Mock(), mock.Mock()]
mock_open_pdf.return_value.__enter__.return_value = fake_pdf
@@ -921,7 +871,7 @@ class TestPDFActions(DirectoriesMixin, TestCase):
result = bulk_edit.split([self.doc2.id], [[1], [2]])
self.assertEqual(result, "OK")
mock_open_pdf.assert_called_once_with(version.source_path)
mock_open_pdf.assert_called_once_with(self.doc2.source_path)
mock_consume_file.assert_not_called()
mock_group.return_value.delay.assert_not_called()
@@ -1099,17 +1049,12 @@ class TestPDFActions(DirectoriesMixin, TestCase):
@mock.patch("documents.data_models.magic.from_file", return_value="application/pdf")
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.open")
def test_rotate_explicit_selection_uses_root_source_when_root_selected(
def test_rotate_uses_document_source_path(
self,
mock_open,
mock_consume_delay,
mock_magic,
):
Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
)
fake_pdf = mock.MagicMock()
fake_pdf.pages = [mock.Mock()]
mock_open.return_value.__enter__.return_value = fake_pdf
@@ -1117,7 +1062,6 @@ class TestPDFActions(DirectoriesMixin, TestCase):
result = bulk_edit.rotate(
[self.doc2.id],
90,
source_mode="explicit_selection",
)
self.assertEqual(result, "OK")
@@ -1151,17 +1095,12 @@ class TestPDFActions(DirectoriesMixin, TestCase):
@mock.patch("documents.data_models.magic.from_file", return_value="application/pdf")
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.open")
def test_delete_pages_explicit_selection_uses_root_source_when_root_selected(
def test_delete_pages_uses_document_source_path(
self,
mock_open,
mock_consume_delay,
mock_magic,
):
Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
)
fake_pdf = mock.MagicMock()
fake_pdf.pages = [mock.Mock(), mock.Mock()]
mock_open.return_value.__enter__.return_value = fake_pdf
@@ -1169,7 +1108,6 @@ class TestPDFActions(DirectoriesMixin, TestCase):
result = bulk_edit.delete_pages(
[self.doc2.id],
[1],
source_mode="explicit_selection",
)
self.assertEqual(result, "OK")
@@ -1328,18 +1266,13 @@ class TestPDFActions(DirectoriesMixin, TestCase):
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.new")
@mock.patch("pikepdf.open")
def test_edit_pdf_explicit_selection_uses_root_source_when_root_selected(
def test_edit_pdf_uses_document_source_path(
self,
mock_open,
mock_new,
mock_consume_delay,
mock_magic,
):
Document.objects.create(
checksum="B-v1",
title="B version 1",
root_document=self.doc2,
)
fake_pdf = mock.MagicMock()
fake_pdf.pages = [mock.Mock()]
mock_open.return_value.__enter__.return_value = fake_pdf
@@ -1351,7 +1284,6 @@ class TestPDFActions(DirectoriesMixin, TestCase):
[self.doc2.id],
operations=[{"page": 1}],
update_document=True,
source_mode="explicit_selection",
)
self.assertEqual(result, "OK")
@@ -1481,17 +1413,12 @@ class TestPDFActions(DirectoriesMixin, TestCase):
@mock.patch("documents.data_models.magic.from_file", return_value="application/pdf")
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("pikepdf.open")
def test_remove_password_explicit_selection_uses_root_source_when_root_selected(
def test_remove_password_uses_document_source_path(
self,
mock_open,
mock_consume_delay,
mock_magic,
) -> None:
Document.objects.create(
checksum="A-v1",
title="A version 1",
root_document=self.doc1,
)
fake_pdf = mock.MagicMock()
mock_open.return_value.__enter__.return_value = fake_pdf
@@ -1499,7 +1426,6 @@ class TestPDFActions(DirectoriesMixin, TestCase):
[self.doc1.id],
password="secret",
update_document=True,
source_mode="explicit_selection",
)
self.assertEqual(result, "OK")

View File

@@ -25,6 +25,7 @@ from documents.models import Correspondent
from documents.models import CustomField
from documents.models import Document
from documents.models import DocumentType
from documents.models import DocumentVersion
from documents.models import StoragePath
from documents.models import Tag
from documents.parsers import ParseError
@@ -725,7 +726,8 @@ class TestConsumer(
document = Document.objects.first()
assert document is not None
self.assertEqual(document.version_label, "v1")
version = DocumentVersion.objects.get(document=document, version_number=1)
self.assertEqual(version.version_label, "v1")
self._assert_first_last_send_progress()
@@ -790,16 +792,17 @@ class TestConsumer(
finally:
consumer.cleanup()
versions = Document.objects.filter(root_document=root_doc)
self.assertEqual(versions.count(), 1)
version = versions.first()
assert version is not None
assert version.original_filename is not None
self.assertEqual(version.version_index, 1)
self.assertEqual(version.version_label, "v2")
self.assertIsNone(version.archive_serial_number)
self.assertEqual(version.original_filename, version_file.name)
self.assertTrue(bool(version.content))
# Initial consume already created version_number=1.
# Version upload created version_number=2.
versions = DocumentVersion.objects.filter(document=root_doc).order_by(
"version_number",
)
self.assertEqual(versions.count(), 2)
uploaded = versions.get(version_number=2)
assert uploaded.original_filename is not None
self.assertEqual(uploaded.version_label, "v2")
self.assertEqual(uploaded.original_filename, version_file.name)
self.assertTrue(bool(uploaded.content))
@override_settings(AUDIT_LOG_ENABLED=True)
@mock.patch("documents.consumer.load_classifier")
@@ -852,14 +855,16 @@ class TestConsumer(
finally:
consumer.cleanup()
version = (
Document.objects.filter(root_document=root_doc).order_by("-id").first()
# Initial consume already created version_number=1.
# Version upload created version_number=2.
versions = DocumentVersion.objects.filter(document=root_doc).order_by(
"version_number",
)
self.assertIsNotNone(version)
assert version is not None
self.assertEqual(version.version_index, 1)
self.assertEqual(version.original_filename, "valid_pdf_version-upload")
self.assertTrue(bool(version.content))
uploaded = versions.get(version_number=2)
self.assertIsNotNone(uploaded)
assert uploaded is not None
self.assertEqual(uploaded.original_filename, "valid_pdf_version-upload")
self.assertTrue(bool(uploaded.content))
@override_settings(AUDIT_LOG_ENABLED=True)
@mock.patch("documents.consumer.load_classifier")
@@ -873,7 +878,7 @@ class TestConsumer(
self.assertIsNotNone(root_doc)
assert root_doc is not None
def consume_version(version_file: Path) -> Document:
def consume_version(version_file: Path) -> DocumentVersion:
status = DummyProgressManager(version_file.name, None)
overrides = DocumentMetadataOverrides()
doc = ConsumableDocument(
@@ -905,18 +910,22 @@ class TestConsumer(
consumer.cleanup()
version = (
Document.objects.filter(root_document=root_doc).order_by("-id").first()
DocumentVersion.objects.filter(document=root_doc)
.order_by("-version_number")
.first()
)
assert version is not None
return version
v1 = consume_version(self.get_test_file2())
self.assertEqual(v1.version_index, 1)
v1.delete()
# First upload: version_number=2 (version 1 was created at initial consume)
v1_dv = consume_version(self.get_test_file2())
self.assertEqual(v1_dv.version_number, 2)
v1_dv.delete()
# The next version should have version_index 2, even though version_index 1 was deleted
v2 = consume_version(self.get_test_file())
self.assertEqual(v2.version_index, 2)
# After deleting version_number=2, MAX is 1 (the initial consume version).
# The next upload gets MAX+1 = 2.
v2_dv = consume_version(self.get_test_file())
self.assertEqual(v2_dv.version_number, 2)
@mock.patch("documents.consumer.load_classifier")
def testClassifyDocument(self, m) -> None:

View File

@@ -102,27 +102,34 @@ class TestDocument(TestCase):
self.assertEqual(len(actual_deletions), 2)
def test_delete_root_deletes_versions(self) -> None:
root = Document.objects.create(
def test_delete_document_cascades_to_versions(self) -> None:
from documents.models import DocumentVersion
doc = Document.objects.create(
correspondent=Correspondent.objects.create(name="Test0"),
title="Head",
content="content",
checksum="checksum",
mime_type="application/pdf",
)
Document.objects.create(
root_document=root,
correspondent=root.correspondent,
title="Version",
content="content",
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum="checksum",
mime_type="application/pdf",
)
DocumentVersion.objects.create(
document=doc,
version_number=2,
checksum="checksum2",
mime_type="application/pdf",
)
root.delete()
self.assertEqual(DocumentVersion.objects.filter(document=doc).count(), 2)
doc.delete()
self.assertEqual(Document.objects.count(), 0)
self.assertEqual(Document.deleted_objects.count(), 2)
self.assertEqual(DocumentVersion.objects.count(), 0)
def test_file_name(self) -> None:
doc = Document(
@@ -156,45 +163,27 @@ class TestDocument(TestCase):
)
self.assertEqual(doc.get_public_filename(), "2020-12-25 test")
def test_suggestion_content_uses_latest_version_content_for_root_documents(
self,
) -> None:
root = Document.objects.create(
title="root",
checksum="root",
def test_suggestion_content_returns_document_content(self) -> None:
doc = Document.objects.create(
title="doc",
checksum="doc",
mime_type="application/pdf",
content="outdated root content",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="latest version content",
content="the document content",
)
self.assertEqual(root.suggestion_content, version.content)
self.assertEqual(doc.suggestion_content, "the document content")
def test_content_length_is_per_document_row_for_versions(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
def test_content_length_reflects_document_content(self) -> None:
doc = Document.objects.create(
title="doc",
checksum="doc",
mime_type="application/pdf",
content="abc",
)
version = Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="abcdefgh",
)
root.refresh_from_db()
version.refresh_from_db()
doc.refresh_from_db()
self.assertEqual(root.content_length, 3)
self.assertEqual(version.content_length, 8)
self.assertEqual(doc.content_length, 3)
def test_suggestion_content() -> None:

View File

@@ -23,6 +23,7 @@ from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import DocumentVersion
from documents.models import StoragePath
from documents.tasks import empty_trash
from documents.tests.factories import DocumentFactory
@@ -78,58 +79,6 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
settings.ORIGINALS_DIR / "test" / "test.pdf",
)
@override_settings(FILENAME_FORMAT=None)
def test_root_storage_path_change_updates_version_files(self) -> None:
old_storage_path = StoragePath.objects.create(
name="old-path",
path="old/{{title}}",
)
new_storage_path = StoragePath.objects.create(
name="new-path",
path="new/{{title}}",
)
root_doc = Document.objects.create(
title="rootdoc",
mime_type="application/pdf",
checksum="root-checksum",
storage_path=old_storage_path,
)
version_doc = Document.objects.create(
title="version-title",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
)
Document.objects.filter(pk=root_doc.pk).update(
filename=generate_filename(root_doc),
)
Document.objects.filter(pk=version_doc.pk).update(
filename=generate_filename(version_doc),
)
root_doc.refresh_from_db()
version_doc.refresh_from_db()
create_source_path_directory(root_doc.source_path)
Path(root_doc.source_path).touch()
create_source_path_directory(version_doc.source_path)
Path(version_doc.source_path).touch()
root_doc.storage_path = new_storage_path
root_doc.save()
root_doc.refresh_from_db()
version_doc.refresh_from_db()
self.assertEqual(root_doc.filename, "new/rootdoc.pdf")
self.assertEqual(version_doc.filename, "new/rootdoc_v1.pdf")
self.assertIsFile(root_doc.source_path)
self.assertIsFile(version_doc.source_path)
self.assertIsNotFile(settings.ORIGINALS_DIR / "old" / "rootdoc.pdf")
self.assertIsNotFile(settings.ORIGINALS_DIR / "old" / "rootdoc_v1.pdf")
@override_settings(FILENAME_FORMAT="{correspondent}/{correspondent}")
def test_file_renaming_missing_permissions(self) -> None:
document = Document()
@@ -1325,93 +1274,16 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
Path("logs.pdf"),
)
@override_settings(FILENAME_FORMAT="{title}")
def test_version_index_suffix_for_template_filename(self) -> None:
root_doc = Document.objects.create(
title="the_doc",
def test_version_number_suffix_in_filename(self) -> None:
"""generate_filename appends _vN when a DocumentVersion is passed."""
doc = Document.objects.create(
title="versioned",
checksum="c",
mime_type="application/pdf",
checksum="root-checksum",
)
version_doc = Document.objects.create(
title="the_doc",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
)
self.assertEqual(generate_filename(version_doc), Path("the_doc_v1.pdf"))
self.assertEqual(
generate_filename(version_doc, counter=1),
Path("the_doc_v1_01.pdf"),
)
@override_settings(FILENAME_FORMAT=None)
def test_version_index_suffix_for_default_filename(self) -> None:
root_doc = Document.objects.create(
title="root",
mime_type="text/plain",
checksum="root-checksum",
)
version_doc = Document.objects.create(
title="root",
mime_type="text/plain",
checksum="version-checksum",
root_document=root_doc,
version_index=2,
)
self.assertEqual(
generate_filename(version_doc),
Path(f"{root_doc.pk:07d}_v2.txt"),
)
self.assertEqual(
generate_filename(version_doc, archive_filename=True),
Path(f"{root_doc.pk:07d}_v2.pdf"),
)
@override_settings(FILENAME_FORMAT="{original_name}")
def test_version_index_suffix_with_original_name_placeholder(self) -> None:
root_doc = Document.objects.create(
title="root",
mime_type="application/pdf",
checksum="root-checksum",
original_filename="root-upload.pdf",
)
version_doc = Document.objects.create(
title="root",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
original_filename="version-upload.pdf",
)
self.assertEqual(generate_filename(version_doc), Path("root-upload_v1.pdf"))
def test_version_index_suffix_with_storage_path(self) -> None:
storage_path = StoragePath.objects.create(
name="vtest",
path="folder/{{title}}",
)
root_doc = Document.objects.create(
title="storage_doc",
mime_type="application/pdf",
checksum="root-checksum",
storage_path=storage_path,
)
version_doc = Document.objects.create(
title="version_title_should_not_be_used",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=3,
)
self.assertEqual(
generate_filename(version_doc),
Path("folder/storage_doc_v3.pdf"),
)
version = DocumentVersion(document=doc, version_number=2)
result = generate_filename(doc, version=version, use_format=False)
assert "_v2" in str(result)
@override_settings(
FILENAME_FORMAT="XX{correspondent}/{title}",

View File

@@ -48,19 +48,12 @@ class _TestMatchingBase(TestCase):
class TestMatching(_TestMatchingBase):
def test_matches_uses_latest_version_content_for_root_documents(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
def test_matches_uses_document_content(self) -> None:
doc = Document.objects.create(
title="doc",
checksum="doc",
mime_type="application/pdf",
content="root content without token",
)
Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="latest version contains keyword",
content="document contains keyword",
)
tag = Tag.objects.create(
name="tag",
@@ -68,23 +61,14 @@ class TestMatching(_TestMatchingBase):
matching_algorithm=Tag.MATCH_ANY,
)
self.assertTrue(matching.matches(tag, root))
self.assertTrue(matching.matches(tag, doc))
def test_matches_does_not_fall_back_to_root_content_when_version_exists(
self,
) -> None:
root = Document.objects.create(
title="root",
checksum="root",
def test_matches_does_not_match_when_content_lacks_keyword(self) -> None:
doc = Document.objects.create(
title="doc",
checksum="doc",
mime_type="application/pdf",
content="root contains keyword",
)
Document.objects.create(
title="v1",
checksum="v1",
mime_type="application/pdf",
root_document=root,
content="latest version without token",
content="document without the token",
)
tag = Tag.objects.create(
name="tag",
@@ -92,7 +76,7 @@ class TestMatching(_TestMatchingBase):
matching_algorithm=Tag.MATCH_ANY,
)
self.assertFalse(matching.matches(tag, root))
self.assertFalse(matching.matches(tag, doc))
def test_match_none(self) -> None:
self._test_matching(

View File

@@ -17,8 +17,8 @@ def _sha256(data: bytes) -> str:
class TestSha256ChecksumDataMigration(TestMigrations):
"""recompute_checksums correctly updates document checksums from MD5 to SHA256."""
migrate_from = "0015_document_version_index_and_more"
migrate_to = "0016_sha256_checksums"
migrate_from = "0014_savedview_visibility_to_ui_settings"
migrate_to = "0015_sha256_checksums"
reset_sequences = True
ORIGINAL_CONTENT = b"original file content for sha256 migration test"

View File

@@ -10,7 +10,7 @@ from documents.data_models import DocumentMetadataOverrides
from documents.data_models import DocumentSource
from documents.models import Document
from documents.models import PaperlessTask
from documents.signals.handlers import add_to_index
from documents.signals import document_consumption_finished
from documents.signals.handlers import before_task_publish_handler
from documents.signals.handlers import task_failure_handler
from documents.signals.handlers import task_postrun_handler
@@ -207,44 +207,17 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
self.assertEqual(celery.states.FAILURE, task.status)
def test_add_to_index_indexes_root_once_for_root_documents(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
def test_add_to_index_calls_add_or_update(self) -> None:
doc = Document.objects.create(
title="test",
checksum="abc",
mime_type="application/pdf",
)
with mock.patch("documents.search.get_backend") as mock_get_backend:
mock_backend = mock.MagicMock()
mock_get_backend.return_value = mock_backend
add_to_index(sender=None, document=root)
mock_backend.add_or_update.assert_called_once_with(root, effective_content="")
def test_add_to_index_reindexes_root_for_version_documents(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="version",
checksum="version",
mime_type="application/pdf",
root_document=root,
)
with mock.patch("documents.search.get_backend") as mock_get_backend:
mock_backend = mock.MagicMock()
mock_get_backend.return_value = mock_backend
add_to_index(sender=None, document=version)
self.assertEqual(mock_backend.add_or_update.call_count, 1)
self.assertEqual(
mock_backend.add_or_update.call_args_list[0].args[0].id,
version.id,
)
self.assertEqual(
mock_backend.add_or_update.call_args_list[0].kwargs,
{"effective_content": version.content},
)
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
mock_backend.add_or_update.assert_called_once_with(doc)

View File

@@ -1,91 +1,86 @@
from __future__ import annotations
from types import SimpleNamespace
from unittest import mock
from django.test import TestCase
import pytest
from documents.conditionals import metadata_etag
from documents.conditionals import preview_etag
from documents.conditionals import thumbnail_last_modified
from documents.models import Document
from documents.tests.utils import DirectoriesMixin
from documents.versioning import resolve_effective_document_by_pk
from documents.tests.factories import DocumentFactory
from documents.tests.factories import DocumentVersionFactory
from documents.versioning import VersionResolutionError
from documents.versioning import get_latest_version
from documents.versioning import get_version_by_pk
from documents.versioning import resolve_requested_version
class TestConditionals(DirectoriesMixin, TestCase):
def test_metadata_etag_uses_latest_version_for_root_request(self) -> None:
root = Document.objects.create(
title="root",
checksum="root-checksum",
archive_checksum="root-archive",
mime_type="application/pdf",
)
latest = Document.objects.create(
title="v1",
checksum="version-checksum",
archive_checksum="version-archive",
mime_type="application/pdf",
root_document=root,
)
@pytest.mark.django_db
class TestGetLatestVersion:
def test_returns_highest_version_number(self) -> None:
doc = DocumentFactory()
DocumentVersionFactory(document=doc, version_number=1)
DocumentVersionFactory(document=doc, version_number=2)
v3 = DocumentVersionFactory(document=doc, version_number=3)
result = get_latest_version(doc)
assert result is not None
assert result.pk == v3.pk
def test_returns_none_when_no_versions(self) -> None:
doc = DocumentFactory()
assert get_latest_version(doc) is None
@pytest.mark.django_db
class TestGetVersionByPk:
def test_returns_version_belonging_to_document(self) -> None:
doc = DocumentFactory()
v = DocumentVersionFactory(document=doc, version_number=1)
result = get_version_by_pk(doc, v.pk)
assert result is not None
assert result.pk == v.pk
def test_returns_none_for_unrelated_version(self) -> None:
doc_a = DocumentFactory()
doc_b = DocumentFactory()
v_b = DocumentVersionFactory(document=doc_b, version_number=1)
assert get_version_by_pk(doc_a, v_b.pk) is None
def test_returns_none_for_nonexistent_pk(self) -> None:
doc = DocumentFactory()
assert get_version_by_pk(doc, 999999) is None
@pytest.mark.django_db
class TestResolveRequestedVersion:
def test_no_version_param_returns_latest(self) -> None:
doc = DocumentFactory()
DocumentVersionFactory(document=doc, version_number=1)
v2 = DocumentVersionFactory(document=doc, version_number=2)
request = SimpleNamespace(query_params={})
result = resolve_requested_version(doc, request)
assert result.version is not None
assert result.version.pk == v2.pk
assert result.error is None
self.assertEqual(metadata_etag(request, root.id), latest.checksum)
self.assertEqual(preview_etag(request, root.id), latest.archive_checksum)
def test_explicit_version_param_returns_that_version(self) -> None:
doc = DocumentFactory()
v1 = DocumentVersionFactory(document=doc, version_number=1)
DocumentVersionFactory(document=doc, version_number=2)
request = SimpleNamespace(query_params={"version": str(v1.pk)})
result = resolve_requested_version(doc, request)
assert result.version is not None
assert result.version.pk == v1.pk
def test_resolve_effective_doc_returns_none_for_invalid_or_unrelated_version(
self,
) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
other_root = Document.objects.create(
title="other",
checksum="other",
mime_type="application/pdf",
)
other_version = Document.objects.create(
title="other-v1",
checksum="other-v1",
mime_type="application/pdf",
root_document=other_root,
)
def test_invalid_version_param_returns_error(self) -> None:
doc = DocumentFactory()
request = SimpleNamespace(query_params={"version": "notanint"})
result = resolve_requested_version(doc, request)
assert result.version is None
assert result.error == VersionResolutionError.INVALID
invalid_request = SimpleNamespace(query_params={"version": "not-a-number"})
unrelated_request = SimpleNamespace(
query_params={"version": str(other_version.id)},
)
self.assertIsNone(
resolve_effective_document_by_pk(root.id, invalid_request).document,
)
self.assertIsNone(
resolve_effective_document_by_pk(root.id, unrelated_request).document,
)
def test_thumbnail_last_modified_uses_effective_document_for_cache_key(
self,
) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
latest = Document.objects.create(
title="v2",
checksum="v2",
mime_type="application/pdf",
root_document=root,
)
latest.thumbnail_path.parent.mkdir(parents=True, exist_ok=True)
latest.thumbnail_path.write_bytes(b"thumb")
request = SimpleNamespace(query_params={})
with mock.patch(
"documents.conditionals.get_thumbnail_modified_key",
return_value="thumb-modified-key",
) as get_thumb_key:
result = thumbnail_last_modified(request, root.id)
self.assertIsNotNone(result)
get_thumb_key.assert_called_once_with(latest.id)
def test_unrelated_version_id_returns_not_found(self) -> None:
doc_a = DocumentFactory()
doc_b = DocumentFactory()
v_b = DocumentVersionFactory(document=doc_b, version_number=1)
request = SimpleNamespace(query_params={"version": str(v_b.pk)})
result = resolve_requested_version(doc_a, request)
assert result.version is None
assert result.error == VersionResolutionError.NOT_FOUND

View File

@@ -1860,7 +1860,10 @@ class TestWorkflows(
self.assertEqual(doc.title, "Doc {created_year]")
def test_document_updated_workflow_ignores_version_documents(self) -> None:
def test_document_updated_workflow_runs_on_versioned_document(self) -> None:
"""Workflows apply to documents even when they have DocumentVersion records."""
from documents.models import DocumentVersion
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
)
@@ -1875,30 +1878,27 @@ class TestWorkflows(
workflow.triggers.add(trigger)
workflow.actions.add(action)
root_doc = Document.objects.create(
title="root",
doc = Document.objects.create(
title="doc",
correspondent=self.c,
original_filename="root.pdf",
original_filename="doc.pdf",
)
version_doc = Document.objects.create(
title="version",
correspondent=self.c,
original_filename="version.pdf",
root_document=root_doc,
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum="abc",
mime_type="application/pdf",
)
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, version_doc)
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
root_doc.refresh_from_db()
version_doc.refresh_from_db()
self.assertIsNone(root_doc.owner)
self.assertIsNone(version_doc.owner)
self.assertFalse(
doc.refresh_from_db()
self.assertEqual(doc.owner, self.user2)
self.assertTrue(
WorkflowRun.objects.filter(
workflow=workflow,
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
document=version_doc,
document=doc,
).exists(),
)
@@ -2200,7 +2200,10 @@ class TestWorkflows(
doc.refresh_from_db()
self.assertEqual(doc.owner, self.user2)
def test_workflow_scheduled_trigger_ignores_version_documents(self) -> None:
def test_workflow_scheduled_trigger_runs_on_versioned_document(self) -> None:
"""Scheduled workflows run against documents that have DocumentVersion records."""
from documents.models import DocumentVersion
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
schedule_offset_days=1,
@@ -2217,42 +2220,31 @@ class TestWorkflows(
workflow.triggers.add(trigger)
workflow.actions.add(action)
root_doc = Document.objects.create(
title="root",
doc = Document.objects.create(
title="doc",
correspondent=self.c,
original_filename="root.pdf",
original_filename="doc.pdf",
added=timezone.now() - timedelta(days=10),
)
version_doc = Document.objects.create(
title="version",
correspondent=self.c,
original_filename="version.pdf",
root_document=root_doc,
added=timezone.now() - timedelta(days=10),
DocumentVersion.objects.create(
document=doc,
version_number=1,
checksum="abc",
mime_type="application/pdf",
)
tasks.check_scheduled_workflows()
root_doc.refresh_from_db()
version_doc.refresh_from_db()
self.assertEqual(root_doc.owner, self.user2)
self.assertIsNone(version_doc.owner)
doc.refresh_from_db()
self.assertEqual(doc.owner, self.user2)
self.assertEqual(
WorkflowRun.objects.filter(
workflow=workflow,
type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
document=root_doc,
document=doc,
).count(),
1,
)
self.assertFalse(
WorkflowRun.objects.filter(
workflow=workflow,
type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
document=version_doc,
).exists(),
)
@mock.patch("documents.models.Document.objects.filter", autospec=True)
def test_workflow_scheduled_trigger_modified(self, mock_filter) -> None:

View File

@@ -6,6 +6,7 @@ from typing import TYPE_CHECKING
from typing import Any
from documents.models import Document
from documents.models import DocumentVersion
if TYPE_CHECKING:
from django.http import HttpRequest
@@ -18,107 +19,55 @@ class VersionResolutionError(StrEnum):
@dataclass(frozen=True, slots=True)
class VersionResolution:
document: Document | None
version: DocumentVersion | None
error: VersionResolutionError | None = None
def _document_manager(*, include_deleted: bool) -> Any:
return Document.global_objects if include_deleted else Document.objects
def get_request_version_param(request: HttpRequest) -> str | None:
if hasattr(request, "query_params"):
return request.query_params.get("version")
return None
def get_root_document(doc: Document, *, include_deleted: bool = False) -> Document:
# Use root_document_id to avoid a query when this is already a root.
# If root_document isn't available, fall back to the document itself.
if doc.root_document_id is None:
return doc
if doc.root_document is not None:
return doc.root_document
manager = _document_manager(include_deleted=include_deleted)
root_doc = manager.only("id").filter(id=doc.root_document_id).first()
return root_doc or doc
def get_latest_version(doc: Document) -> DocumentVersion | None:
"""Return the highest-version_number DocumentVersion for doc, or None."""
return (
DocumentVersion.objects.filter(document=doc).order_by("-version_number").first()
)
def get_latest_version_for_root(
root_doc: Document,
*,
include_deleted: bool = False,
) -> Document:
manager = _document_manager(include_deleted=include_deleted)
latest = manager.filter(root_document=root_doc).order_by("-id").first()
return latest or root_doc
def get_version_by_pk(doc: Document, version_pk: int) -> DocumentVersion | None:
"""Return the DocumentVersion with the given pk if it belongs to doc."""
return DocumentVersion.objects.filter(pk=version_pk, document=doc).first()
def resolve_requested_version_for_root(
root_doc: Document,
def resolve_requested_version(
doc: Document,
request: Any,
*,
include_deleted: bool = False,
) -> VersionResolution:
"""
Resolve the DocumentVersion to serve based on the optional ``?version=<pk>``
query parameter.
- No parameter: return the latest version.
- Parameter present: validate and return that specific version.
"""
version_param = get_request_version_param(request)
if not version_param:
return VersionResolution(
document=get_latest_version_for_root(
root_doc,
include_deleted=include_deleted,
),
)
latest = get_latest_version(doc)
if latest is None:
return VersionResolution(
version=None,
error=VersionResolutionError.NOT_FOUND,
)
return VersionResolution(version=latest)
try:
version_id = int(version_param)
version_pk = int(version_param)
except (TypeError, ValueError):
return VersionResolution(document=None, error=VersionResolutionError.INVALID)
return VersionResolution(version=None, error=VersionResolutionError.INVALID)
manager = _document_manager(include_deleted=include_deleted)
candidate = manager.only("id", "root_document_id").filter(id=version_id).first()
if candidate is None:
return VersionResolution(document=None, error=VersionResolutionError.NOT_FOUND)
if candidate.id != root_doc.id and candidate.root_document_id != root_doc.id:
return VersionResolution(document=None, error=VersionResolutionError.NOT_FOUND)
return VersionResolution(document=candidate)
def resolve_effective_document(
request_doc: Document,
request: Any,
*,
include_deleted: bool = False,
) -> VersionResolution:
root_doc = get_root_document(request_doc, include_deleted=include_deleted)
if get_request_version_param(request) is not None:
return resolve_requested_version_for_root(
root_doc,
request,
include_deleted=include_deleted,
)
if request_doc.root_document_id is None:
return VersionResolution(
document=get_latest_version_for_root(
root_doc,
include_deleted=include_deleted,
),
)
return VersionResolution(document=request_doc)
def resolve_effective_document_by_pk(
pk: int,
request: Any,
*,
include_deleted: bool = False,
) -> VersionResolution:
manager = _document_manager(include_deleted=include_deleted)
request_doc = manager.only("id", "root_document_id").filter(pk=pk).first()
if request_doc is None:
return VersionResolution(document=None, error=VersionResolutionError.NOT_FOUND)
return resolve_effective_document(
request_doc,
request,
include_deleted=include_deleted,
)
version = get_version_by_pk(doc, version_pk)
if version is None:
return VersionResolution(version=None, error=VersionResolutionError.NOT_FOUND)
return VersionResolution(version=version)

View File

@@ -31,17 +31,13 @@ from django.db.migrations.loader import MigrationLoader
from django.db.migrations.recorder import MigrationRecorder
from django.db.models import Case
from django.db.models import Count
from django.db.models import F
from django.db.models import IntegerField
from django.db.models import Max
from django.db.models import Model
from django.db.models import OuterRef
from django.db.models import Prefetch
from django.db.models import Q
from django.db.models import Subquery
from django.db.models import Sum
from django.db.models import When
from django.db.models.functions import Coalesce
from django.db.models.functions import Lower
from django.db.models.manager import Manager
from django.http import FileResponse
@@ -146,6 +142,7 @@ from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import DocumentVersion
from documents.models import Note
from documents.models import PaperlessTask
from documents.models import SavedView
@@ -217,10 +214,8 @@ from documents.tasks import train_classifier
from documents.tasks import update_document_parent_tags
from documents.utils import get_boolean
from documents.versioning import VersionResolutionError
from documents.versioning import get_latest_version_for_root
from documents.versioning import get_request_version_param
from documents.versioning import get_root_document
from documents.versioning import resolve_requested_version_for_root
from documents.versioning import get_latest_version
from documents.versioning import resolve_requested_version
from paperless import version
from paperless.celery import app as celery_app
from paperless.config import AIConfig
@@ -822,7 +817,7 @@ class DocumentViewSet(
ObjectOwnedOrGrantedPermissionsFilter,
)
filterset_class = DocumentFilterSet
search_fields = ("title", "correspondent__name", "effective_content")
search_fields = ("title", "correspondent__name", "content")
ordering_fields = (
"id",
"title",
@@ -895,27 +890,21 @@ class DocumentViewSet(
}
def get_queryset(self):
latest_version_content = Subquery(
Document.objects.filter(root_document=OuterRef("pk"))
.order_by("-id")
.values("content")[:1],
)
return (
Document.objects.filter(root_document__isnull=True)
.distinct()
Document.objects.distinct()
.order_by("-created")
.annotate(effective_content=Coalesce(latest_version_content, F("content")))
.annotate(num_notes=Count("notes"))
.select_related("correspondent", "storage_path", "document_type", "owner")
.prefetch_related(
Prefetch(
"versions",
queryset=Document.objects.only(
queryset=DocumentVersion.objects.only(
"id",
"added",
"checksum",
"version_label",
"root_document_id",
"version_number",
"document_id",
),
),
"tags",
@@ -943,35 +932,6 @@ class DocumentViewSet(
)
return super().get_serializer(*args, **kwargs)
@extend_schema(
operation_id="documents_root",
responses=inline_serializer(
name="DocumentRootResponse",
fields={
"root_id": serializers.IntegerField(),
},
),
)
@action(methods=["get"], detail=True, url_path="root")
def root(self, request, pk=None):
try:
doc = Document.global_objects.select_related(
"owner",
"root_document",
).get(pk=pk)
except Document.DoesNotExist:
raise Http404
root_doc = get_root_document(doc)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
root_doc,
):
return HttpResponseForbidden("Insufficient permissions")
return Response({"root_id": root_doc.id})
def retrieve(
self,
request: Request,
@@ -997,14 +957,16 @@ class DocumentViewSet(
content_doc = (
self._resolve_file_doc(root_doc, request)
if "version" in request.query_params
else get_latest_version_for_root(root_doc)
else get_latest_version(root_doc)
)
content_updated = "content" in request.data
updated_content = request.data.get("content") if content_updated else None
data = request.data.copy()
serializer_partial = partial
if content_updated and content_doc.id != root_doc.id:
# content_doc is a DocumentVersion (separate table); write goes there.
content_is_versioned = isinstance(content_doc, DocumentVersion)
if content_updated and content_is_versioned:
if updated_content is None:
raise ValidationError({"content": ["This field may not be null."]})
data.pop("content", None)
@@ -1018,11 +980,20 @@ class DocumentViewSet(
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
if content_updated and content_doc.id != root_doc.id:
content_doc.content = (
str(updated_content) if updated_content is not None else ""
)
content_doc.save(update_fields=["content", "modified"])
if content_updated and content_is_versioned:
new_content = str(updated_content) if updated_content is not None else ""
content_doc.content = new_content
# DocumentVersion has no database ``modified`` field.
content_doc.save(update_fields=["content"])
# Keep Document.content in sync when the latest version is edited.
is_latest = not DocumentVersion.objects.filter(
document=root_doc,
version_number__gt=content_doc.version_number,
).exists()
if is_latest:
root_doc.content = new_content
root_doc.save(update_fields=["content"])
refreshed_doc = self.get_queryset().get(pk=root_doc.pk)
response_data = self.get_serializer(refreshed_doc).data
@@ -1083,30 +1054,20 @@ class DocumentViewSet(
and request.query_params["original"] == "true"
)
def _resolve_file_doc(self, root_doc: Document, request):
version_requested = get_request_version_param(request) is not None
resolution = resolve_requested_version_for_root(
root_doc,
request,
include_deleted=version_requested,
)
def _resolve_file_doc(self, root_doc: Document, request) -> "DocumentVersion":
resolution = resolve_requested_version(root_doc, request)
if resolution.error == VersionResolutionError.INVALID:
raise NotFound("Invalid version parameter")
if resolution.document is None:
if resolution.version is None:
raise Http404
return resolution.document
return resolution.version
def _get_effective_file_doc(
self,
request_doc: Document,
root_doc: Document,
request: Request,
) -> Document:
if (
request_doc.root_document_id is not None
and get_request_version_param(request) is None
):
return request_doc
) -> "DocumentVersion":
return self._resolve_file_doc(root_doc, request)
def _resolve_request_and_root_doc(
@@ -1118,24 +1079,17 @@ class DocumentViewSet(
) -> tuple[Document, Document] | HttpResponseForbidden:
manager = Document.global_objects if include_deleted else Document.objects
try:
request_doc = manager.select_related(
"owner",
"root_document",
).get(id=pk)
root_doc = manager.select_related("owner").get(id=pk)
except Document.DoesNotExist:
raise Http404
root_doc = get_root_document(
request_doc,
include_deleted=include_deleted,
)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
root_doc,
):
return HttpResponseForbidden("Insufficient permissions")
return request_doc, root_doc
return root_doc, root_doc
def file_response(self, pk, request, disposition):
resolved = self._resolve_request_and_root_doc(
@@ -1704,11 +1658,7 @@ class DocumentViewSet(
serializer.is_valid(raise_exception=True)
try:
request_doc = Document.objects.select_related(
"owner",
"root_document",
).get(pk=pk)
root_doc = get_root_document(request_doc)
root_doc = Document.objects.select_related("owner").get(pk=pk)
if request.user is not None and not has_perms_owner_aware(
request.user,
"change_document",
@@ -1762,29 +1712,16 @@ class DocumentViewSet(
def _get_root_doc_for_version_action(self, pk) -> Document:
try:
root_doc = Document.objects.select_related(
"owner",
"root_document",
).get(pk=pk)
return Document.objects.select_related("owner").get(pk=pk)
except Document.DoesNotExist:
raise Http404
return get_root_document(root_doc)
def _get_version_doc_for_root(self, root_doc: Document, version_id) -> Document:
def _get_version_for_doc(self, doc: Document, version_pk: int) -> "DocumentVersion":
try:
version_doc = Document.objects.select_related("owner").get(
pk=version_id,
)
except Document.DoesNotExist:
return DocumentVersion.objects.get(pk=version_pk, document=doc)
except DocumentVersion.DoesNotExist:
raise Http404
if (
version_doc.id != root_doc.id
and version_doc.root_document_id != root_doc.id
):
raise Http404
return version_doc
@extend_schema(
operation_id="documents_delete_version",
parameters=[
@@ -1798,7 +1735,7 @@ class DocumentViewSet(
name="DeleteDocumentVersionResult",
fields={
"result": serializers.CharField(),
"current_version_id": serializers.IntegerField(),
"current_version_id": serializers.IntegerField(allow_null=True),
},
),
)
@@ -1809,7 +1746,6 @@ class DocumentViewSet(
)
def delete_version(self, request, pk=None, version_id=None):
root_doc = self._get_root_doc_for_version_action(pk)
if request.user is not None and not has_perms_owner_aware(
request.user,
"delete_document",
@@ -1817,51 +1753,82 @@ class DocumentViewSet(
):
return HttpResponseForbidden("Insufficient permissions")
version_doc = self._get_version_doc_for_root(root_doc, version_id)
version = self._get_version_for_doc(root_doc, int(version_id))
if version_doc.id == root_doc.id:
if DocumentVersion.objects.filter(document=root_doc).count() <= 1:
return HttpResponseBadRequest(
"Cannot delete the root/original version. Delete the document instead.",
"Cannot delete the only remaining version. Delete the document instead.",
)
from documents.search import get_backend
_backend = get_backend()
_backend.remove(version_doc.pk)
version_doc_id = version_doc.id
version_doc.delete()
deleted_pk = version.pk
# Capture whether this is the latest version before deleting.
was_latest = not DocumentVersion.objects.filter(
document=root_doc,
version_number__gt=version.version_number,
).exists()
version.delete()
# Only sync Document cache fields if the deleted version was the latest.
if was_latest:
new_latest = (
DocumentVersion.objects.filter(document=root_doc)
.order_by("-version_number")
.first()
)
if new_latest is not None:
root_doc.content = new_latest.content
root_doc.checksum = new_latest.checksum
root_doc.archive_checksum = new_latest.archive_checksum
root_doc.filename = new_latest.filename
root_doc.archive_filename = new_latest.archive_filename
root_doc.mime_type = new_latest.mime_type
root_doc.page_count = new_latest.page_count
root_doc.original_filename = new_latest.original_filename
root_doc.modified = timezone.now()
root_doc.save(
update_fields=[
"content",
"checksum",
"archive_checksum",
"filename",
"archive_filename",
"mime_type",
"page_count",
"original_filename",
"modified",
],
)
_backend.add_or_update(root_doc)
if settings.AUDIT_LOG_ENABLED:
actor = (
request.user if request.user and request.user.is_authenticated else None
)
from auditlog.models import LogEntry
LogEntry.objects.log_create(
instance=root_doc,
changes={
"Version Deleted": ["None", version_doc_id],
},
changes={"Version Deleted": ["None", deleted_pk]},
action=LogEntry.Action.UPDATE,
actor=actor,
additional_data={
"reason": "Version deleted",
"version_id": version_doc_id,
},
additional_data={"reason": "Version deleted", "version_id": deleted_pk},
)
current = (
Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc))
.order_by("-id")
DocumentVersion.objects.filter(document=root_doc)
.order_by("-version_number")
.first()
)
document_updated.send(
sender=self.__class__,
document=root_doc,
)
document_updated.send(sender=self.__class__, document=root_doc)
return Response(
{
"result": "OK",
"current_version_id": current.id if current else root_doc.id,
"current_version_id": current.pk if current else None,
},
)
@@ -1888,6 +1855,7 @@ class DocumentViewSet(
required=False,
allow_null=True,
),
"version_number": serializers.IntegerField(read_only=True),
"is_root": serializers.BooleanField(),
},
),
@@ -1905,40 +1873,37 @@ class DocumentViewSet(
):
return HttpResponseForbidden("Insufficient permissions")
version_doc = self._get_version_doc_for_root(root_doc, version_id)
old_label = version_doc.version_label
version_doc.version_label = serializer.validated_data["version_label"]
version_doc.save(update_fields=["version_label"])
version = self._get_version_for_doc(root_doc, int(version_id))
old_label = version.version_label
version.version_label = serializer.validated_data["version_label"]
version.save(update_fields=["version_label"])
if settings.AUDIT_LOG_ENABLED and old_label != version_doc.version_label:
if settings.AUDIT_LOG_ENABLED and old_label != version.version_label:
actor = (
request.user if request.user and request.user.is_authenticated else None
)
from auditlog.models import LogEntry
LogEntry.objects.log_create(
instance=root_doc,
changes={
"Version Label": [old_label, version_doc.version_label],
},
changes={"Version Label": [old_label, version.version_label]},
action=LogEntry.Action.UPDATE,
actor=actor,
additional_data={
"reason": "Version label updated",
"version_id": version_doc.id,
"version_id": version.pk,
},
)
document_updated.send(
sender=self.__class__,
document=root_doc,
)
document_updated.send(sender=self.__class__, document=root_doc)
return Response(
{
"id": version_doc.id,
"added": version_doc.added,
"version_label": version_doc.version_label,
"checksum": version_doc.checksum,
"is_root": version_doc.id == root_doc.id,
"id": version.pk,
"added": version.added,
"version_label": version.version_label,
"checksum": version.checksum,
"version_number": version.version_number,
"is_root": version.version_number == 1,
},
)
@@ -3935,7 +3900,7 @@ class SharedLinkView(View):
def serve_file(
*,
doc: Document,
doc: "Document | DocumentVersion",
use_archive: bool,
disposition: str,
follow_formatting: bool = False,