Files
paperless-ngx/src/documents/tests/test_task_signals.py
T
Trenton H 50f6b2d4c3 feat(search): wire Tantivy backend into all callsites; remove Whoosh
- Replace all `from documents import index` + Whoosh writer usage across
  admin.py, bulk_edit.py, tasks.py, views.py, signals/handlers.py with
  `get_backend().add_or_update/remove/batch_update`
- Add `effective_content` param to `_build_tantivy_doc` / `add_or_update`
  (used by signal handler to re-index root doc with version's OCR text)
- Add `wipe_index()` (renamed from `_wipe_index`) to public API; use from
  `document_index --recreate` flag
- `index_optimize()` replaced with deprecation log message; Tantivy
  manages segment merging automatically
- `index_reindex()` now calls `get_backend().rebuild()` + `reset_backend()`
  with select_related/prefetch_related for efficiency
- `document_index` management command: add `--recreate` flag
- Status view: use `get_backend()` + dir mtime scan instead of Whoosh
  `ix.last_modified()`
- Delete `documents/index.py`, `test_index.py`, `test_delayedquery.py`
- Update all tests: patch `documents.search.get_backend` (lazy imports);
  `DirectoriesMixin` calls `reset_backend()` in setUp/tearDown;
  `TestDocumentConsumptionFinishedSignal` likewise
- `test_api_search.py`: fix order-independent assertions for date-range
  queries; fix `_rewrite_8digit_date` to be field-aware and
  timezone-correct for DateTimeField vs DateField

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-30 10:43:30 -07:00

249 lines
7.7 KiB
Python

import uuid
from unittest import mock
import celery
from django.test import TestCase
from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides
from documents.data_models import DocumentSource
from documents.models import Document
from documents.models import PaperlessTask
from documents.signals.handlers import add_to_index
from documents.signals.handlers import before_task_publish_handler
from documents.signals.handlers import task_failure_handler
from documents.signals.handlers import task_postrun_handler
from documents.signals.handlers import task_prerun_handler
from documents.tests.test_consumer import fake_magic_from_file
from documents.tests.utils import DirectoriesMixin
@mock.patch("documents.consumer.magic.from_file", fake_magic_from_file)
class TestTaskSignalHandler(DirectoriesMixin, TestCase):
def util_call_before_task_publish_handler(
self,
headers_to_use,
body_to_use,
) -> None:
"""
Simple utility to call the pre-run handle and ensure it created a single task
instance
"""
self.assertEqual(PaperlessTask.objects.all().count(), 0)
before_task_publish_handler(headers=headers_to_use, body=body_to_use)
self.assertEqual(PaperlessTask.objects.all().count(), 1)
def test_before_task_publish_handler_consume(self) -> None:
"""
GIVEN:
- A celery task is started via the consume folder
WHEN:
- Task before publish handler is called
THEN:
- The task is created and marked as pending
"""
headers = {
"id": str(uuid.uuid4()),
"task": "documents.tasks.consume_file",
}
body = (
# args
(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file="/consume/hello-999.pdf",
),
DocumentMetadataOverrides(
title="Hello world",
owner_id=1,
),
),
# kwargs
{},
# celery stuff
{"callbacks": None, "errbacks": None, "chain": None, "chord": None},
)
self.util_call_before_task_publish_handler(
headers_to_use=headers,
body_to_use=body,
)
task = PaperlessTask.objects.get()
self.assertIsNotNone(task)
self.assertEqual(headers["id"], task.task_id)
self.assertEqual("hello-999.pdf", task.task_file_name)
self.assertEqual(PaperlessTask.TaskName.CONSUME_FILE, task.task_name)
self.assertEqual(1, task.owner_id)
self.assertEqual(celery.states.PENDING, task.status)
def test_task_prerun_handler(self) -> None:
"""
GIVEN:
- A celery task is started via the consume folder
WHEN:
- Task starts execution
THEN:
- The task is marked as started
"""
headers = {
"id": str(uuid.uuid4()),
"task": "documents.tasks.consume_file",
}
body = (
# args
(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file="/consume/hello-99.pdf",
),
None,
),
# kwargs
{},
# celery stuff
{"callbacks": None, "errbacks": None, "chain": None, "chord": None},
)
self.util_call_before_task_publish_handler(
headers_to_use=headers,
body_to_use=body,
)
task_prerun_handler(task_id=headers["id"])
task = PaperlessTask.objects.get()
self.assertEqual(celery.states.STARTED, task.status)
def test_task_postrun_handler(self) -> None:
"""
GIVEN:
- A celery task is started via the consume folder
WHEN:
- Task finished execution
THEN:
- The task is marked as started
"""
headers = {
"id": str(uuid.uuid4()),
"task": "documents.tasks.consume_file",
}
body = (
# args
(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file="/consume/hello-9.pdf",
),
None,
),
# kwargs
{},
# celery stuff
{"callbacks": None, "errbacks": None, "chain": None, "chord": None},
)
self.util_call_before_task_publish_handler(
headers_to_use=headers,
body_to_use=body,
)
task_postrun_handler(
task_id=headers["id"],
retval="Success. New document id 1 created",
state=celery.states.SUCCESS,
)
task = PaperlessTask.objects.get()
self.assertEqual(celery.states.SUCCESS, task.status)
def test_task_failure_handler(self) -> None:
"""
GIVEN:
- A celery task is started via the consume folder
WHEN:
- Task failed execution
THEN:
- The task is marked as failed
"""
headers = {
"id": str(uuid.uuid4()),
"task": "documents.tasks.consume_file",
}
body = (
# args
(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file="/consume/hello-9.pdf",
),
None,
),
# kwargs
{},
# celery stuff
{"callbacks": None, "errbacks": None, "chain": None, "chord": None},
)
self.util_call_before_task_publish_handler(
headers_to_use=headers,
body_to_use=body,
)
task_failure_handler(
task_id=headers["id"],
exception="Example failure",
)
task = PaperlessTask.objects.get()
self.assertEqual(celery.states.FAILURE, task.status)
def test_add_to_index_indexes_root_once_for_root_documents(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
with mock.patch("documents.search.get_backend") as mock_get_backend:
mock_backend = mock.MagicMock()
mock_get_backend.return_value = mock_backend
add_to_index(sender=None, document=root)
mock_backend.add_or_update.assert_called_once_with(root)
def test_add_to_index_reindexes_root_for_version_documents(self) -> None:
root = Document.objects.create(
title="root",
checksum="root",
mime_type="application/pdf",
)
version = Document.objects.create(
title="version",
checksum="version",
mime_type="application/pdf",
root_document=root,
)
with mock.patch("documents.search.get_backend") as mock_get_backend:
mock_backend = mock.MagicMock()
mock_get_backend.return_value = mock_backend
add_to_index(sender=None, document=version)
self.assertEqual(mock_backend.add_or_update.call_count, 2)
self.assertEqual(
mock_backend.add_or_update.call_args_list[0].args[0].id,
version.id,
)
self.assertEqual(
mock_backend.add_or_update.call_args_list[1].args[0].id,
root.id,
)
self.assertEqual(
mock_backend.add_or_update.call_args_list[1].kwargs,
{"effective_content": version.content},
)