Files
paperless-ngx/src/documents/tests/test_tasks.py
T
Trenton H 8e67828bd7 Feature: Redesign the task system (#12584)
* feat(tasks): replace PaperlessTask model with structured redesign

Drop the old string-based PaperlessTask table and recreate it with
Status/TaskType/TriggerSource enums, JSONField result storage, and
duration tracking fields. Update all call sites to use the new API.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): rewrite signal handlers to track all task types

Replace the old consume_file-only handler with a full rewrite that tracks
6 task types (consume_file, train_classifier, sanity_check, index_optimize,
llm_index, mail_fetch) with proper trigger source detection, input data
extraction, legacy result string parsing, duration/wait time recording,
and structured error capture on failure.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): add traceback and revoked state coverage to signal tests

* refactor(tasks): remove manual PaperlessTask creation and scheduled/auto params

All task records are now created exclusively via Celery signals (Task 2).
Removed PaperlessTask creation/update from train_classifier, sanity_check,
llmindex_index, and check_sanity. Removed scheduled= and auto= parameters
from all 7 call sites. Updated apply_async callers to use trigger_source
headers instead. Exceptions now propagate naturally from task functions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): auto-inject trigger_source=scheduled header for all beat tasks

Inject `headers: {"trigger_source": "scheduled"}` into every Celery beat
schedule entry so signal handlers can identify scheduler-originated tasks
without per-task instrumentation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): update serializer, filter, and viewset with v9 backwards compat

- Replace TasksViewSerializer/RunTaskViewSerializer with TaskSerializerV10
  (new field names), TaskSerializerV9 (v9 compat), TaskSummarySerializer,
  and RunTaskSerializer
- Add AcknowledgeTasksViewSerializer unchanged (kept existing validation)
- Expand PaperlessTaskFilterSet with MultipleChoiceFilter for task_type,
  trigger_source, status; add is_complete, date_created_after/before filters
- Replace TasksViewSet.get_serializer_class() to branch on request.version
- Add get_queryset() v9 compat for task_name/type query params
- Add acknowledge_all, summary, active actions to TasksViewSet
- Rewrite run action to use apply_async with trigger_source header
- Add timedelta import to views.py; add MultipleChoiceFilter/DateTimeFilter
  to filters.py imports

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(tasks): add read_only_fields to TaskSerializerV9, enforce admin via permission_classes on run action

* test(tasks): rewrite API task tests for redesigned model and v9 compat

Replaces the old Django TestCase-based tests with pytest-style classes using
PaperlessTaskFactory. Covers v10 field names, v9 backwards-compat field
mapping, filtering, ordering, acknowledge, acknowledge_all, summary, active,
and run endpoints. Also adds PaperlessTaskFactory to factories.py and fixes
a redundant source= kwarg in TaskSerializerV10.related_document_ids.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): fix two spec gaps in task API test suite

Move test_list_is_owner_aware to TestGetTasksV10 (it tests GET /api/tasks/,
not acknowledge). Add test_related_document_ids_includes_duplicate_of to
cover the duplicate_of path in the related_document_ids property.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): address code quality review findings

Remove trivial field-existence tests per project conventions. Fix
potentially flaky ordering test to use explicit date_created values.
Add is_complete=false filter test, v9 type filter input direction test,
and tighten TestActive second test to target REVOKED specifically.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): update TaskAdmin for redesigned model

Add date_created, duration_seconds to list_display; add trigger_source
to list_filter; add input_data, duration_seconds, wait_time_seconds to
readonly_fields.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): update Angular types and service for task redesign

Replace PaperlessTaskName/PaperlessTaskType/PaperlessTaskStatus enums
with new PaperlessTaskType, PaperlessTaskTriggerSource, PaperlessTaskStatus
enums. Update PaperlessTask interface to new field names (task_type,
trigger_source, input_data, result_message, related_document_ids).
Update TasksService to filter by task_type instead of task_name.
Update tasks component and system-status-dialog to use new field names.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* chore(tasks): remove django-celery-results

PaperlessTask now tracks all task results via Celery signals. The
django-celery-results DB backend was write-only -- nothing reads
from it. Drop the package and add a migration to clean up the
orphaned tables.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test: fix remaining tests broken by task system redesign

Update all tests that created PaperlessTask objects with old field names
to use PaperlessTaskFactory and new field names (task_type, trigger_source,
status, result_message). Use apply_async instead of delay where mocked.
Drop TestCheckSanityTaskRecording — tests PaperlessTask creation that was
intentionally removed from check_sanity().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): improve test_api_tasks.py structure and add api marker

- Move admin_client, v9_client, user_client fixtures to conftest.py so
  they can be reused by other API tests; all three now build on the
  rest_api_client fixture instead of creating APIClient() directly
- Move regular_user fixture to conftest.py (was already done, now also
  used by the new client fixtures)
- Add docstrings to every test method describing the behaviour under test
- Move timedelta/timezone imports to module level
- Register 'api' pytest marker in pyproject.toml and apply pytestmark to
  the entire file so all 40 tests are selectable via -m api

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(tasks): simplify task tracking code after redesign

- Extract COMPLETE_STATUSES as a class constant on PaperlessTask,
  eliminating the repeated status tuple across models.py, views.py (3×),
  and filters.py
- Extract _CELERY_STATE_TO_STATUS as a module-level constant instead of
  rebuilding the dict on every task_postrun
- Extract _V9_TYPE_TO_TRIGGER_SOURCE and _RUNNABLE_TASKS as class
  constants on TasksViewSet instead of rebuilding on every request
- Extract _TRIGGER_SOURCE_TO_V9_TYPE as a class constant on
  TaskSerializerV9 instead of rebuilding per serialized object
- Extract _get_consume_args helper to deduplicate identical arg
  extraction logic in _extract_input_data, _determine_trigger_source,
  and _extract_owner_id
- Move inline imports (re, traceback) and Avg to module level
- Fix _DOCUMENT_SOURCE_TO_TRIGGER type annotation key type to
  DocumentSource instead of Any
- Remove redundant truthiness checks in SystemStatusView branches
  already guarded by an is-None check

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(tasks): add docstrings and rename _parse_legacy_result

- Add docstrings to _extract_input_data, _determine_trigger_source,
  _extract_owner_id explaining what each helper does and why
- Rename _parse_legacy_result -> _parse_consume_result: the function
  parses current consume_file string outputs (consumer.py returns
  "New document id N created" and "It is a duplicate of X (#N)"),
  not legacy data; the old name was misleading

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): extend and harden the task system redesign

- TaskType: add EMPTY_TRASH, CHECK_WORKFLOWS, CLEANUP_SHARE_LINKS;
  remove INDEX_REBUILD (no backing task — beat schedule uses index_optimize)
- TRACKED_TASKS: wire up all nine task types including the three new ones
  and llmindex_index / process_mail_accounts
- Add task_revoked_handler so cancelled/expired tasks are marked REVOKED
- Fix double-write: task_postrun_handler no longer overwrites result_data
  when status is already FAILURE (task_failure_handler owns that write)
- v9 serialiser: map EMAIL_CONSUME and FOLDER_CONSUME to AUTO_TASK
- views: scope task list to owner for regular users, admins see all;
  validate ?days= query param and return 400 on bad input
- tests: add test_list_admin_sees_all_tasks; rename/fix
  test_parses_duplicate_string (duplicates produce SUCCESS, not FAILURE);
  use PaperlessTaskFactory in modified tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(tasks): fix MAIL_FETCH null input_data and postrun double-query

- _extract_input_data: return {} instead of {"account_ids": None} when
  process_mail_accounts is called without an explicit account list (the
  normal beat-scheduled path); add test to cover this path
- task_postrun_handler: replace filter().first() + filter().update() with
  get() + save(update_fields=[...]) — single fetch, single write,
  consistent with task_prerun_handler

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(tasks): add queryset stub to satisfy drf-spectacular schema generation

TasksViewSet.get_queryset() accesses request.user, which drf-spectacular
cannot provide during static schema generation.  Adding a class-level
queryset = PaperlessTask.objects.none() gives spectacular a model to
introspect without invoking get_queryset(), eliminating both warnings
and the test_valid_schema failure.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): fill coverage gaps in task system

- test_task_signals: add TestTaskRevokedHandler (marks REVOKED, ignores
  None request, ignores unknown id); switch existing direct
  PaperlessTask.objects.create calls to PaperlessTaskFactory; import
  pytest_mock and use MockerFixture typing on mocker params
- test_api_tasks: add test_rejects_invalid_days_param to TestSummary
- tasks.service.spec: add dismissAllTasks test (POST acknowledge_all +
  reload)
- models: add pragma: no cover to __str__, is_complete, and
  related_document_ids (trivial delegates, covered indirectly)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Well, that was a bad push.

* Fixes v9 API compatability with testing coverage

* fix(tasks): restore INDEX_OPTIMIZE enum and remove no-op run button

INDEX_OPTIMIZE was dropped from the TaskType enum but still referenced
in _RUNNABLE_TASKS (views.py) and the frontend system-status-dialog,
causing an AttributeError at import time. Restore the enum value in the
model and migration so the serializer accepts it, but remove it from
_RUNNABLE_TASKS since index_optimize is a Tantivy no-op. Remove the
frontend "Run Task" button for index optimization accordingly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(tasks): v9 type filter now matches all equivalent trigger sources

The v9 ?type= query param mapped each value to a single TriggerSource,
but the serializer maps multiple sources to the same v9 type value.
A task serialized as "auto_task" would not appear when filtering by
?type=auto_task if its trigger_source was email_consume or
folder_consume. Same issue for "manual_task" missing web_ui and
api_upload sources. Changed to trigger_source__in with the full set
of sources for each v9 type value.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(tasks): give task_failure_handler full ownership of FAILURE path

task_postrun_handler now early-returns for FAILURE states instead of
redundantly writing status and date_done. task_failure_handler now
computes duration_seconds and wait_time_seconds so failed tasks get
complete timing data. This eliminates a wasted .get() + .save() round
trip on every failed task and gives each handler a clean, non-overlapping
responsibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(tasks): resolve trigger_source header via TriggerSource enum lookup

Replace two hardcoded string comparisons ("scheduled", "system") with a
single TriggerSource(header_source) lookup so the enum values are the
single source of truth. Any valid TriggerSource DB value passed in the
header is accepted; invalid values fall through to the document-source /
MANUAL logic. Update tests to pass enum values in headers rather than raw
strings, and add a test for the invalid-header fallback path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(tasks): use TriggerSource enum values at all apply_async call sites

Replace raw strings ("system", "manual") with PaperlessTask.TriggerSource
enum values in the three callers that can import models. The settings
file remains a raw string (models cannot be imported at settings load
time) with a comment pointing to the enum value it must match.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): parametrize repetitive test cases in task test files

test_api_tasks.py:
- Collapse six trigger_source->v9-type tests into one parametrized test,
  adding the previously untested API_UPLOAD case
- Collapse three task_name mapping tests (two remaps + pass-through)
  into one parametrized test
- Collapse two acknowledge_all status tests into one parametrized test
- Collapse two run-endpoint 400 tests into one parametrized test
- Update run/ assertions to use TriggerSource enum values

test_task_signals.py:
- Collapse three trigger_source header tests into one parametrized test
- Collapse two DocumentSource->TriggerSource mapping tests into one
  parametrized test
- Collapse two prerun ignore-invalid-id tests into one parametrized test

All parametrize cases use pytest.param with descriptive ids.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Handle JSON serialization for datetime and Path.  Further restrist the v9 permissions as Copilot suggests

* That should fix the generated schema/browser

* Use XSerializer for the schema

* A few more basic cases I see no value in covering

* Drops the migration related stuff too.  Just in case we want it again or it confuses people

* fix: annotate tasks_summary_retrieve as array of TaskSummarySerializer

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: annotate tasks_active_retrieve as array of TaskSerializerV10

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Restore task running to superuser only

* Removes the acknowledge/dismiss all stuff

* Aligns v10 and v9 task permissions with each other

* Short blurb just to warn users about the tasks being cleared

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-20 09:28:41 -07:00

380 lines
13 KiB
Python

import shutil
from datetime import timedelta
from pathlib import Path
from unittest import mock
import pytest
from django.conf import settings
from django.test import TestCase
from django.test import override_settings
from django.utils import timezone
from documents import tasks
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import Tag
from documents.sanity_checker import SanityCheckFailedException
from documents.sanity_checker import SanityCheckMessages
from documents.tests.test_classifier import dummy_preprocess
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
@pytest.mark.django_db
class TestIndexOptimize:
def test_index_optimize(self) -> None:
"""Index optimization task must execute without error (Tantivy handles optimization automatically)."""
tasks.index_optimize()
class TestClassifier(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
@mock.patch("documents.tasks.load_classifier")
def test_train_classifier_no_auto_matching(self, load_classifier) -> None:
tasks.train_classifier()
load_classifier.assert_not_called()
@mock.patch("documents.tasks.load_classifier")
def test_train_classifier_with_auto_tag(self, load_classifier) -> None:
load_classifier.return_value = None
Tag.objects.create(matching_algorithm=Tag.MATCH_AUTO, name="test")
with self.assertRaises(ValueError):
tasks.train_classifier()
load_classifier.assert_called_once()
self.assertIsNotFile(settings.MODEL_FILE)
@mock.patch("documents.tasks.load_classifier")
def test_train_classifier_with_auto_type(self, load_classifier) -> None:
load_classifier.return_value = None
DocumentType.objects.create(matching_algorithm=Tag.MATCH_AUTO, name="test")
with self.assertRaises(ValueError):
tasks.train_classifier()
load_classifier.assert_called_once()
self.assertIsNotFile(settings.MODEL_FILE)
@mock.patch("documents.tasks.load_classifier")
def test_train_classifier_with_auto_correspondent(self, load_classifier) -> None:
load_classifier.return_value = None
Correspondent.objects.create(matching_algorithm=Tag.MATCH_AUTO, name="test")
with self.assertRaises(ValueError):
tasks.train_classifier()
load_classifier.assert_called_once()
self.assertIsNotFile(settings.MODEL_FILE)
def test_train_classifier(self) -> None:
c = Correspondent.objects.create(matching_algorithm=Tag.MATCH_AUTO, name="test")
doc = Document.objects.create(correspondent=c, content="test", title="test")
self.assertIsNotFile(settings.MODEL_FILE)
with mock.patch(
"documents.classifier.DocumentClassifier.preprocess_content",
) as pre_proc_mock:
pre_proc_mock.side_effect = dummy_preprocess
tasks.train_classifier()
self.assertIsFile(settings.MODEL_FILE)
mtime = Path(settings.MODEL_FILE).stat().st_mtime
tasks.train_classifier()
self.assertIsFile(settings.MODEL_FILE)
mtime2 = Path(settings.MODEL_FILE).stat().st_mtime
self.assertEqual(mtime, mtime2)
doc.content = "test2"
doc.save()
tasks.train_classifier()
self.assertIsFile(settings.MODEL_FILE)
mtime3 = Path(settings.MODEL_FILE).stat().st_mtime
self.assertNotEqual(mtime2, mtime3)
@pytest.mark.django_db
class TestSanityCheck:
@pytest.fixture
def mock_check_sanity(self, mocker) -> mock.MagicMock:
return mocker.patch("documents.tasks.sanity_checker.check_sanity")
def test_sanity_check_success(self, mock_check_sanity: mock.MagicMock) -> None:
mock_check_sanity.return_value = SanityCheckMessages()
assert tasks.sanity_check() == "No issues detected."
mock_check_sanity.assert_called_once()
def test_sanity_check_error_raises(
self,
mock_check_sanity: mock.MagicMock,
sample_doc: Document,
) -> None:
messages = SanityCheckMessages()
messages.error(sample_doc.pk, "some error")
mock_check_sanity.return_value = messages
with pytest.raises(SanityCheckFailedException):
tasks.sanity_check()
mock_check_sanity.assert_called_once()
def test_sanity_check_error_no_raise(
self,
mock_check_sanity: mock.MagicMock,
sample_doc: Document,
) -> None:
messages = SanityCheckMessages()
messages.error(sample_doc.pk, "some error")
mock_check_sanity.return_value = messages
result = tasks.sanity_check(raise_on_error=False)
assert "1 document(s) with errors" in result
assert "Check logs for details." in result
mock_check_sanity.assert_called_once()
def test_sanity_check_warning_only(
self,
mock_check_sanity: mock.MagicMock,
) -> None:
messages = SanityCheckMessages()
messages.warning(None, "extra file")
mock_check_sanity.return_value = messages
result = tasks.sanity_check()
assert result == "1 global warning(s) found."
mock_check_sanity.assert_called_once()
def test_sanity_check_info_only(
self,
mock_check_sanity: mock.MagicMock,
sample_doc: Document,
) -> None:
messages = SanityCheckMessages()
messages.info(sample_doc.pk, "some info")
mock_check_sanity.return_value = messages
result = tasks.sanity_check()
assert result == "1 document(s) with infos found."
mock_check_sanity.assert_called_once()
def test_sanity_check_errors_warnings_and_infos(
self,
mock_check_sanity: mock.MagicMock,
sample_doc: Document,
) -> None:
messages = SanityCheckMessages()
messages.error(sample_doc.pk, "broken")
messages.warning(sample_doc.pk, "odd")
messages.info(sample_doc.pk, "fyi")
messages.warning(None, "extra file")
mock_check_sanity.return_value = messages
result = tasks.sanity_check(raise_on_error=False)
assert "1 document(s) with errors" in result
assert "1 document(s) with warnings" in result
assert "1 document(s) with infos" in result
assert "1 global warning(s)" in result
assert "Check logs for details." in result
mock_check_sanity.assert_called_once()
class TestBulkUpdate(DirectoriesMixin, TestCase):
def test_bulk_update_documents(self) -> None:
doc1 = Document.objects.create(
title="test",
content="my document",
checksum="wow",
added=timezone.now(),
created=timezone.now(),
modified=timezone.now(),
)
tasks.bulk_update_documents([doc1.pk])
class TestEmptyTrashTask(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
"""
GIVEN:
- Existing document in trash
WHEN:
- Empty trash task is called without doc_ids
THEN:
- Document is only deleted if it has been in trash for more than delay (default 30 days)
"""
def test_empty_trash(self) -> None:
doc = Document.objects.create(
title="test",
content="my document",
checksum="wow",
added=timezone.now(),
created=timezone.now(),
modified=timezone.now(),
)
doc.delete()
self.assertEqual(Document.global_objects.count(), 1)
self.assertEqual(Document.objects.count(), 0)
tasks.empty_trash()
self.assertEqual(Document.global_objects.count(), 1)
doc.deleted_at = timezone.now() - timedelta(days=31)
doc.save()
tasks.empty_trash()
self.assertEqual(Document.global_objects.count(), 0)
@override_settings(ARCHIVE_FILE_GENERATION="always")
class TestUpdateContent(DirectoriesMixin, TestCase):
def test_update_content_maybe_archive_file(self) -> None:
"""
GIVEN:
- Existing document with archive file
WHEN:
- Update content task is called
THEN:
- Document is reprocessed, content and checksum are updated
"""
sample1 = self.dirs.scratch_dir / "sample.pdf"
shutil.copy(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000001.pdf",
sample1,
)
sample1_archive = self.dirs.archive_dir / "sample_archive.pdf"
shutil.copy(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000001.pdf",
sample1_archive,
)
doc = Document.objects.create(
title="test",
content="my document",
checksum="wow",
archive_checksum="wow",
filename=sample1,
mime_type="application/pdf",
archive_filename=sample1_archive,
)
tasks.update_document_content_maybe_archive_file(doc.pk)
self.assertNotEqual(Document.objects.get(pk=doc.pk).content, "test")
self.assertNotEqual(Document.objects.get(pk=doc.pk).archive_checksum, "wow")
def test_update_content_maybe_archive_file_no_archive(self) -> None:
"""
GIVEN:
- Existing document without archive file
WHEN:
- Update content task is called
THEN:
- Document is reprocessed, content is updated
"""
sample1 = self.dirs.scratch_dir / "sample.pdf"
shutil.copy(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000001.pdf",
sample1,
)
doc = Document.objects.create(
title="test",
content="my document",
checksum="wow",
filename=sample1,
mime_type="application/pdf",
)
tasks.update_document_content_maybe_archive_file(doc.pk)
self.assertNotEqual(Document.objects.get(pk=doc.pk).content, "test")
class TestAIIndex(DirectoriesMixin, TestCase):
@override_settings(
AI_ENABLED=True,
LLM_EMBEDDING_BACKEND="huggingface",
)
def test_ai_index_success(self) -> None:
"""
GIVEN:
- Document exists, AI is enabled, llm index backend is set
WHEN:
- llmindex_index task is called
THEN:
- update_llm_index is called and its result is returned
"""
Document.objects.create(
title="test",
content="my document",
checksum="wow",
)
# lazy-loaded so mock the actual function
with mock.patch("paperless_ai.indexing.update_llm_index") as update_llm_index:
update_llm_index.return_value = "LLM index updated successfully."
result = tasks.llmindex_index()
update_llm_index.assert_called_once()
self.assertEqual(result, "LLM index updated successfully.")
@override_settings(
AI_ENABLED=True,
LLM_EMBEDDING_BACKEND="huggingface",
)
def test_ai_index_failure(self) -> None:
"""
GIVEN:
- Document exists, AI is enabled, llm index backend is set
WHEN:
- llmindex_index task is called and update_llm_index raises an exception
THEN:
- the exception propagates to the caller
"""
Document.objects.create(
title="test",
content="my document",
checksum="wow",
)
# lazy-loaded so mock the actual function
with mock.patch("paperless_ai.indexing.update_llm_index") as update_llm_index:
update_llm_index.side_effect = Exception("LLM index update failed.")
with self.assertRaisesRegex(Exception, "LLM index update failed."):
tasks.llmindex_index()
update_llm_index.assert_called_once()
def test_update_document_in_llm_index(self) -> None:
"""
GIVEN:
- Nothing
WHEN:
- update_document_in_llm_index task is called
THEN:
- llm_index_add_or_update_document is called
"""
doc = Document.objects.create(
title="test",
content="my document",
checksum="wow",
)
with mock.patch(
"documents.tasks.llm_index_add_or_update_document",
) as llm_index_add_or_update_document:
tasks.update_document_in_llm_index(doc)
llm_index_add_or_update_document.assert_called_once_with(doc)
def test_remove_document_from_llm_index(self) -> None:
"""
GIVEN:
- Nothing
WHEN:
- remove_document_from_llm_index task is called
THEN:
- llm_index_remove_document is called
"""
doc = Document.objects.create(
title="test",
content="my document",
checksum="wow",
)
with mock.patch(
"documents.tasks.llm_index_remove_document",
) as llm_index_remove_document:
tasks.remove_document_from_llm_index(doc)
llm_index_remove_document.assert_called_once_with(doc)