Feature: Replace Whoosh with tantivy search backend (#12471)

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Antoine Mérino <3023499+Merinorus@users.noreply.github.com>
This commit is contained in:
Trenton H
2026-04-02 12:38:22 -07:00
committed by GitHub
parent e01a762e81
commit aed9abe48c
52 changed files with 4050 additions and 1708 deletions
+21
View File
@@ -1,5 +1,6 @@
import shutil
import zoneinfo
from collections.abc import Generator
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
@@ -92,6 +93,26 @@ def sample_doc(
)
@pytest.fixture()
def _search_index(
tmp_path: Path,
settings: SettingsWrapper,
) -> Generator[None, None, None]:
"""Create a temp index directory and point INDEX_DIR at it.
Resets the backend singleton before and after so each test gets a clean
index rather than reusing a stale singleton from another test.
"""
from documents.search import reset_backend
index_dir = tmp_path / "index"
index_dir.mkdir()
settings.INDEX_DIR = index_dir
reset_backend()
yield
reset_backend()
@pytest.fixture()
def settings_timezone(settings: SettingsWrapper) -> zoneinfo.ZoneInfo:
return zoneinfo.ZoneInfo(settings.TIME_ZONE)
+33
View File
@@ -0,0 +1,33 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from documents.search._backend import TantivyBackend
from documents.search._backend import reset_backend
if TYPE_CHECKING:
from collections.abc import Generator
from pathlib import Path
from pytest_django.fixtures import SettingsWrapper
@pytest.fixture
def index_dir(tmp_path: Path, settings: SettingsWrapper) -> Path:
path = tmp_path / "index"
path.mkdir()
settings.INDEX_DIR = path
return path
@pytest.fixture
def backend() -> Generator[TantivyBackend, None, None]:
b = TantivyBackend() # path=None → in-memory index
b.open()
try:
yield b
finally:
b.close()
reset_backend()
+502
View File
@@ -0,0 +1,502 @@
import pytest
from django.contrib.auth.models import User
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import Note
from documents.search._backend import TantivyBackend
from documents.search._backend import get_backend
from documents.search._backend import reset_backend
pytestmark = [pytest.mark.search, pytest.mark.django_db]
class TestWriteBatch:
"""Test WriteBatch context manager functionality."""
def test_rolls_back_on_exception(self, backend: TantivyBackend):
"""Batch operations must rollback on exception to preserve index integrity."""
doc = Document.objects.create(
title="Rollback Target",
content="should survive",
checksum="RB1",
pk=1,
)
backend.add_or_update(doc)
try:
with backend.batch_update() as batch:
batch.remove(doc.pk)
raise RuntimeError("simulated failure")
except RuntimeError:
pass
r = backend.search(
"should survive",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert r.total == 1
class TestSearch:
"""Test search functionality."""
def test_scores_normalised_top_hit_is_one(self, backend: TantivyBackend):
"""Search scores must be normalized so top hit has score 1.0 for UI consistency."""
for i, title in enumerate(["bank invoice", "bank statement", "bank receipt"]):
doc = Document.objects.create(
title=title,
content=title,
checksum=f"SN{i}",
pk=10 + i,
)
backend.add_or_update(doc)
r = backend.search(
"bank",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert r.hits[0]["score"] == pytest.approx(1.0)
assert all(0.0 <= h["score"] <= 1.0 for h in r.hits)
def test_sort_field_ascending(self, backend: TantivyBackend):
"""Searching with sort_reverse=False must return results in ascending ASN order."""
for asn in [30, 10, 20]:
doc = Document.objects.create(
title="sortable",
content="sortable content",
checksum=f"SFA{asn}",
archive_serial_number=asn,
)
backend.add_or_update(doc)
r = backend.search(
"sortable",
user=None,
page=1,
page_size=10,
sort_field="archive_serial_number",
sort_reverse=False,
)
assert r.total == 3
asns = [Document.objects.get(pk=h["id"]).archive_serial_number for h in r.hits]
assert asns == [10, 20, 30]
def test_sort_field_descending(self, backend: TantivyBackend):
"""Searching with sort_reverse=True must return results in descending ASN order."""
for asn in [30, 10, 20]:
doc = Document.objects.create(
title="sortable",
content="sortable content",
checksum=f"SFD{asn}",
archive_serial_number=asn,
)
backend.add_or_update(doc)
r = backend.search(
"sortable",
user=None,
page=1,
page_size=10,
sort_field="archive_serial_number",
sort_reverse=True,
)
assert r.total == 3
asns = [Document.objects.get(pk=h["id"]).archive_serial_number for h in r.hits]
assert asns == [30, 20, 10]
def test_fuzzy_threshold_filters_low_score_hits(
self,
backend: TantivyBackend,
settings,
):
"""When ADVANCED_FUZZY_SEARCH_THRESHOLD exceeds all normalized scores, hits must be filtered out."""
doc = Document.objects.create(
title="Invoice document",
content="financial report",
checksum="FT1",
pk=120,
)
backend.add_or_update(doc)
# Threshold above 1.0 filters every hit (normalized scores top out at 1.0)
settings.ADVANCED_FUZZY_SEARCH_THRESHOLD = 1.1
r = backend.search(
"invoice",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert r.hits == []
def test_owner_filter(self, backend: TantivyBackend):
"""Document owners can search their private documents; other users cannot access them."""
owner = User.objects.create_user("owner")
other = User.objects.create_user("other")
doc = Document.objects.create(
title="Private",
content="secret",
checksum="PF1",
pk=20,
owner=owner,
)
backend.add_or_update(doc)
assert (
backend.search(
"secret",
user=owner,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
).total
== 1
)
assert (
backend.search(
"secret",
user=other,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
).total
== 0
)
class TestRebuild:
"""Test index rebuilding functionality."""
def test_with_iter_wrapper_called(self, backend: TantivyBackend):
"""Index rebuild must pass documents through iter_wrapper for progress tracking."""
seen = []
def wrapper(docs):
for doc in docs:
seen.append(doc.pk)
yield doc
Document.objects.create(title="Tracked", content="x", checksum="TW1", pk=30)
backend.rebuild(Document.objects.all(), iter_wrapper=wrapper)
assert 30 in seen
class TestAutocomplete:
"""Test autocomplete functionality."""
def test_basic_functionality(self, backend: TantivyBackend):
"""Autocomplete must return words matching the given prefix."""
doc = Document.objects.create(
title="Invoice from Microsoft Corporation",
content="payment details",
checksum="AC1",
pk=40,
)
backend.add_or_update(doc)
results = backend.autocomplete("micro", limit=10)
assert "microsoft" in results
def test_results_ordered_by_document_frequency(self, backend: TantivyBackend):
"""Autocomplete results must be ordered by document frequency to prioritize common terms."""
# "payment" appears in 3 docs; "payslip" in 1 — "pay" prefix should
# return "payment" before "payslip".
for i, (title, checksum) in enumerate(
[
("payment invoice", "AF1"),
("payment receipt", "AF2"),
("payment confirmation", "AF3"),
("payslip march", "AF4"),
],
start=41,
):
doc = Document.objects.create(
title=title,
content="details",
checksum=checksum,
pk=i,
)
backend.add_or_update(doc)
results = backend.autocomplete("pay", limit=10)
assert results.index("payment") < results.index("payslip")
class TestMoreLikeThis:
"""Test more like this functionality."""
def test_excludes_original(self, backend: TantivyBackend):
"""More like this queries must exclude the reference document from results."""
doc1 = Document.objects.create(
title="Important document",
content="financial information",
checksum="MLT1",
pk=50,
)
doc2 = Document.objects.create(
title="Another document",
content="financial report",
checksum="MLT2",
pk=51,
)
backend.add_or_update(doc1)
backend.add_or_update(doc2)
results = backend.more_like_this(doc_id=50, user=None, page=1, page_size=10)
returned_ids = [hit["id"] for hit in results.hits]
assert 50 not in returned_ids # Original document excluded
def test_with_user_applies_permission_filter(self, backend: TantivyBackend):
"""more_like_this with a user must exclude documents that user cannot see."""
viewer = User.objects.create_user("mlt_viewer")
other = User.objects.create_user("mlt_other")
public_doc = Document.objects.create(
title="Public financial document",
content="quarterly financial analysis report figures",
checksum="MLT3",
pk=52,
)
private_doc = Document.objects.create(
title="Private financial document",
content="quarterly financial analysis report figures",
checksum="MLT4",
pk=53,
owner=other,
)
backend.add_or_update(public_doc)
backend.add_or_update(private_doc)
results = backend.more_like_this(doc_id=52, user=viewer, page=1, page_size=10)
returned_ids = [hit["id"] for hit in results.hits]
# private_doc is owned by other, so viewer cannot see it
assert 53 not in returned_ids
def test_document_not_in_index_returns_empty(self, backend: TantivyBackend):
"""more_like_this for a doc_id absent from the index must return empty results."""
results = backend.more_like_this(doc_id=9999, user=None, page=1, page_size=10)
assert results.hits == []
assert results.total == 0
class TestSingleton:
"""Test get_backend() and reset_backend() singleton lifecycle."""
@pytest.fixture(autouse=True)
def _clean(self):
reset_backend()
yield
reset_backend()
def test_returns_same_instance_on_repeated_calls(self, index_dir):
"""Singleton pattern: repeated calls to get_backend() must return the same instance."""
assert get_backend() is get_backend()
def test_reinitializes_when_index_dir_changes(self, tmp_path, settings):
"""Backend singleton must reinitialize when INDEX_DIR setting changes for test isolation."""
settings.INDEX_DIR = tmp_path / "a"
(tmp_path / "a").mkdir()
b1 = get_backend()
settings.INDEX_DIR = tmp_path / "b"
(tmp_path / "b").mkdir()
b2 = get_backend()
assert b1 is not b2
assert b2._path == tmp_path / "b"
def test_reset_forces_new_instance(self, index_dir):
"""reset_backend() must force creation of a new backend instance on next get_backend() call."""
b1 = get_backend()
reset_backend()
b2 = get_backend()
assert b1 is not b2
class TestFieldHandling:
"""Test handling of various document fields."""
def test_none_values_handled_correctly(self, backend: TantivyBackend):
"""Document fields with None values must not cause indexing errors."""
doc = Document.objects.create(
title="Test Doc",
content="test content",
checksum="NV1",
pk=60,
original_filename=None,
page_count=None,
)
# Should not raise an exception
backend.add_or_update(doc)
results = backend.search(
"test",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1
def test_custom_fields_include_name_and_value(self, backend: TantivyBackend):
"""Custom fields must be indexed with both field name and value for structured queries."""
# Create a custom field
field = CustomField.objects.create(
name="Invoice Number",
data_type=CustomField.FieldDataType.STRING,
)
doc = Document.objects.create(
title="Invoice",
content="test",
checksum="CF1",
pk=70,
)
CustomFieldInstance.objects.create(
document=doc,
field=field,
value_text="INV-2024-001",
)
# Should not raise an exception during indexing
backend.add_or_update(doc)
results = backend.search(
"invoice",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1
def test_select_custom_field_indexes_label_not_id(self, backend: TantivyBackend):
"""SELECT custom fields must index the human-readable label, not the opaque option ID."""
field = CustomField.objects.create(
name="Category",
data_type=CustomField.FieldDataType.SELECT,
extra_data={
"select_options": [
{"id": "opt_abc", "label": "Invoice"},
{"id": "opt_def", "label": "Receipt"},
],
},
)
doc = Document.objects.create(
title="Categorised doc",
content="test",
checksum="SEL1",
pk=71,
)
CustomFieldInstance.objects.create(
document=doc,
field=field,
value_select="opt_abc",
)
backend.add_or_update(doc)
# Label should be findable
results = backend.search(
"custom_fields.value:invoice",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1
# Opaque ID must not appear in the index
results = backend.search(
"custom_fields.value:opt_abc",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 0
def test_none_custom_field_value_not_indexed(self, backend: TantivyBackend):
"""Custom field instances with no value set must not produce an index entry."""
field = CustomField.objects.create(
name="Optional",
data_type=CustomField.FieldDataType.SELECT,
extra_data={"select_options": [{"id": "opt_1", "label": "Yes"}]},
)
doc = Document.objects.create(
title="Unset field doc",
content="test",
checksum="SEL2",
pk=72,
)
CustomFieldInstance.objects.create(
document=doc,
field=field,
value_select=None,
)
backend.add_or_update(doc)
# The string "none" must not appear as an indexed value
results = backend.search(
"custom_fields.value:none",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 0
def test_notes_include_user_information(self, backend: TantivyBackend):
"""Notes must be indexed with user information when available for structured queries."""
user = User.objects.create_user("notewriter")
doc = Document.objects.create(
title="Doc with notes",
content="test",
checksum="NT1",
pk=80,
)
Note.objects.create(document=doc, note="Important note", user=user)
# Should not raise an exception during indexing
backend.add_or_update(doc)
# Test basic document search first
results = backend.search(
"test",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1, (
f"Expected 1, got {results.total}. Document content should be searchable."
)
# Test notes search — must use structured JSON syntax now that note
# is no longer in DEFAULT_SEARCH_FIELDS
results = backend.search(
"notes.note:important",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1, (
f"Expected 1, got {results.total}. Note content should be searchable via notes.note: prefix."
)
@@ -0,0 +1,138 @@
import pytest
from documents.tests.utils import TestMigrations
pytestmark = pytest.mark.search
class TestMigrateFulltextQueryFieldPrefixes(TestMigrations):
migrate_from = "0016_sha256_checksums"
migrate_to = "0017_migrate_fulltext_query_field_prefixes"
def setUpBeforeMigration(self, apps) -> None:
User = apps.get_model("auth", "User")
SavedView = apps.get_model("documents", "SavedView")
SavedViewFilterRule = apps.get_model("documents", "SavedViewFilterRule")
user = User.objects.create(username="testuser")
def make_rule(value: str):
view = SavedView.objects.create(
owner=user,
name=f"view-{value}",
sort_field="created",
)
return SavedViewFilterRule.objects.create(
saved_view=view,
rule_type=20, # fulltext query
value=value,
)
# Simple field prefixes
self.rule_note = make_rule("note:invoice")
self.rule_cf = make_rule("custom_field:amount")
# Combined query
self.rule_combined = make_rule("note:invoice AND custom_field:total")
# Parenthesized groups (Whoosh syntax)
self.rule_parens = make_rule("(note:invoice OR note:receipt)")
# Prefix operators
self.rule_plus = make_rule("+note:foo")
self.rule_minus = make_rule("-note:bar")
# Boosted
self.rule_boost = make_rule("note:test^2")
# Should NOT be rewritten — no field prefix match
self.rule_no_match = make_rule("title:hello content:world")
# Should NOT false-positive on word boundaries
self.rule_denote = make_rule("denote:foo")
# Already using new syntax — should be idempotent
self.rule_already_migrated = make_rule("notes.note:foo")
self.rule_already_migrated_cf = make_rule("custom_fields.value:bar")
# Null value — should not crash
view_null = SavedView.objects.create(
owner=user,
name="view-null",
sort_field="created",
)
self.rule_null = SavedViewFilterRule.objects.create(
saved_view=view_null,
rule_type=20,
value=None,
)
# Non-fulltext rule type — should be untouched
view_other = SavedView.objects.create(
owner=user,
name="view-other-type",
sort_field="created",
)
self.rule_other_type = SavedViewFilterRule.objects.create(
saved_view=view_other,
rule_type=0, # title contains
value="note:something",
)
def test_note_prefix_rewritten(self):
self.rule_note.refresh_from_db()
self.assertEqual(self.rule_note.value, "notes.note:invoice")
def test_custom_field_prefix_rewritten(self):
self.rule_cf.refresh_from_db()
self.assertEqual(self.rule_cf.value, "custom_fields.value:amount")
def test_combined_query_rewritten(self):
self.rule_combined.refresh_from_db()
self.assertEqual(
self.rule_combined.value,
"notes.note:invoice AND custom_fields.value:total",
)
def test_parenthesized_groups(self):
self.rule_parens.refresh_from_db()
self.assertEqual(
self.rule_parens.value,
"(notes.note:invoice OR notes.note:receipt)",
)
def test_plus_prefix(self):
self.rule_plus.refresh_from_db()
self.assertEqual(self.rule_plus.value, "+notes.note:foo")
def test_minus_prefix(self):
self.rule_minus.refresh_from_db()
self.assertEqual(self.rule_minus.value, "-notes.note:bar")
def test_boosted(self):
self.rule_boost.refresh_from_db()
self.assertEqual(self.rule_boost.value, "notes.note:test^2")
def test_no_match_unchanged(self):
self.rule_no_match.refresh_from_db()
self.assertEqual(self.rule_no_match.value, "title:hello content:world")
def test_word_boundary_no_false_positive(self):
self.rule_denote.refresh_from_db()
self.assertEqual(self.rule_denote.value, "denote:foo")
def test_already_migrated_idempotent(self):
self.rule_already_migrated.refresh_from_db()
self.assertEqual(self.rule_already_migrated.value, "notes.note:foo")
def test_already_migrated_cf_idempotent(self):
self.rule_already_migrated_cf.refresh_from_db()
self.assertEqual(self.rule_already_migrated_cf.value, "custom_fields.value:bar")
def test_null_value_no_crash(self):
self.rule_null.refresh_from_db()
self.assertIsNone(self.rule_null.value)
def test_non_fulltext_rule_untouched(self):
self.rule_other_type.refresh_from_db()
self.assertEqual(self.rule_other_type.value, "note:something")
+530
View File
@@ -0,0 +1,530 @@
from __future__ import annotations
import re
from datetime import UTC
from datetime import datetime
from datetime import tzinfo
from typing import TYPE_CHECKING
from zoneinfo import ZoneInfo
import pytest
import tantivy
import time_machine
from documents.search._query import _date_only_range
from documents.search._query import _datetime_range
from documents.search._query import _rewrite_compact_date
from documents.search._query import build_permission_filter
from documents.search._query import normalize_query
from documents.search._query import parse_user_query
from documents.search._query import rewrite_natural_date_keywords
from documents.search._schema import build_schema
from documents.search._tokenizer import register_tokenizers
if TYPE_CHECKING:
from django.contrib.auth.base_user import AbstractBaseUser
pytestmark = pytest.mark.search
EASTERN = ZoneInfo("America/New_York") # UTC-5 / UTC-4 (DST)
AUCKLAND = ZoneInfo("Pacific/Auckland") # UTC+13 in southern-hemisphere summer
def _range(result: str, field: str) -> tuple[str, str]:
m = re.search(rf"{field}:\[(.+?) TO (.+?)\]", result)
assert m, f"No range for {field!r} in: {result!r}"
return m.group(1), m.group(2)
class TestCreatedDateField:
"""
created is a Django DateField: indexed as midnight UTC of the local calendar
date. No offset arithmetic needed - the local calendar date is what matters.
"""
@pytest.mark.parametrize(
("tz", "expected_lo", "expected_hi"),
[
pytest.param(UTC, "2026-03-28T00:00:00Z", "2026-03-29T00:00:00Z", id="utc"),
pytest.param(
EASTERN,
"2026-03-28T00:00:00Z",
"2026-03-29T00:00:00Z",
id="eastern_same_calendar_date",
),
],
)
@time_machine.travel(datetime(2026, 3, 28, 15, 30, tzinfo=UTC), tick=False)
def test_today(self, tz: tzinfo, expected_lo: str, expected_hi: str) -> None:
lo, hi = _range(rewrite_natural_date_keywords("created:today", tz), "created")
assert lo == expected_lo
assert hi == expected_hi
@time_machine.travel(datetime(2026, 3, 28, 3, 0, tzinfo=UTC), tick=False)
def test_today_auckland_ahead_of_utc(self) -> None:
# UTC 03:00 -> Auckland (UTC+13) = 16:00 same date; local date = 2026-03-28
lo, _ = _range(
rewrite_natural_date_keywords("created:today", AUCKLAND),
"created",
)
assert lo == "2026-03-28T00:00:00Z"
@pytest.mark.parametrize(
("field", "keyword", "expected_lo", "expected_hi"),
[
pytest.param(
"created",
"yesterday",
"2026-03-27T00:00:00Z",
"2026-03-28T00:00:00Z",
id="yesterday",
),
pytest.param(
"created",
"this_week",
"2026-03-23T00:00:00Z",
"2026-03-30T00:00:00Z",
id="this_week_mon_sun",
),
pytest.param(
"created",
"last_week",
"2026-03-16T00:00:00Z",
"2026-03-23T00:00:00Z",
id="last_week",
),
pytest.param(
"created",
"this_month",
"2026-03-01T00:00:00Z",
"2026-04-01T00:00:00Z",
id="this_month",
),
pytest.param(
"created",
"last_month",
"2026-02-01T00:00:00Z",
"2026-03-01T00:00:00Z",
id="last_month",
),
pytest.param(
"created",
"this_year",
"2026-01-01T00:00:00Z",
"2027-01-01T00:00:00Z",
id="this_year",
),
pytest.param(
"created",
"last_year",
"2025-01-01T00:00:00Z",
"2026-01-01T00:00:00Z",
id="last_year",
),
],
)
@time_machine.travel(datetime(2026, 3, 28, 15, 0, tzinfo=UTC), tick=False)
def test_date_keywords(
self,
field: str,
keyword: str,
expected_lo: str,
expected_hi: str,
) -> None:
# 2026-03-28 is Saturday; Mon-Sun week calculation built into expectations
query = f"{field}:{keyword}"
lo, hi = _range(rewrite_natural_date_keywords(query, UTC), field)
assert lo == expected_lo
assert hi == expected_hi
@time_machine.travel(datetime(2026, 12, 15, 12, 0, tzinfo=UTC), tick=False)
def test_this_month_december_wraps_to_next_year(self) -> None:
# December: next month must roll over to January 1 of next year
lo, hi = _range(
rewrite_natural_date_keywords("created:this_month", UTC),
"created",
)
assert lo == "2026-12-01T00:00:00Z"
assert hi == "2027-01-01T00:00:00Z"
@time_machine.travel(datetime(2026, 1, 15, 12, 0, tzinfo=UTC), tick=False)
def test_last_month_january_wraps_to_previous_year(self) -> None:
# January: last month must roll back to December 1 of previous year
lo, hi = _range(
rewrite_natural_date_keywords("created:last_month", UTC),
"created",
)
assert lo == "2025-12-01T00:00:00Z"
assert hi == "2026-01-01T00:00:00Z"
def test_unknown_keyword_raises(self) -> None:
with pytest.raises(ValueError, match="Unknown keyword"):
_date_only_range("bogus_keyword", UTC)
class TestDateTimeFields:
"""
added/modified store full UTC datetimes. Natural keywords must convert
the local day boundaries to UTC - timezone offset arithmetic IS required.
"""
@time_machine.travel(datetime(2026, 3, 28, 15, 30, tzinfo=UTC), tick=False)
def test_added_today_eastern(self) -> None:
# EDT = UTC-4; local midnight 2026-03-28 00:00 EDT = 2026-03-28 04:00 UTC
lo, hi = _range(rewrite_natural_date_keywords("added:today", EASTERN), "added")
assert lo == "2026-03-28T04:00:00Z"
assert hi == "2026-03-29T04:00:00Z"
@time_machine.travel(datetime(2026, 3, 29, 2, 0, tzinfo=UTC), tick=False)
def test_added_today_auckland_midnight_crossing(self) -> None:
# UTC 02:00 on 2026-03-29 -> Auckland (UTC+13) = 2026-03-29 15:00 local
# Auckland midnight = UTC 2026-03-28 11:00
lo, hi = _range(rewrite_natural_date_keywords("added:today", AUCKLAND), "added")
assert lo == "2026-03-28T11:00:00Z"
assert hi == "2026-03-29T11:00:00Z"
@time_machine.travel(datetime(2026, 3, 28, 15, 0, tzinfo=UTC), tick=False)
def test_modified_today_utc(self) -> None:
lo, hi = _range(
rewrite_natural_date_keywords("modified:today", UTC),
"modified",
)
assert lo == "2026-03-28T00:00:00Z"
assert hi == "2026-03-29T00:00:00Z"
@pytest.mark.parametrize(
("keyword", "expected_lo", "expected_hi"),
[
pytest.param(
"yesterday",
"2026-03-27T00:00:00Z",
"2026-03-28T00:00:00Z",
id="yesterday",
),
pytest.param(
"this_week",
"2026-03-23T00:00:00Z",
"2026-03-30T00:00:00Z",
id="this_week",
),
pytest.param(
"last_week",
"2026-03-16T00:00:00Z",
"2026-03-23T00:00:00Z",
id="last_week",
),
pytest.param(
"this_month",
"2026-03-01T00:00:00Z",
"2026-04-01T00:00:00Z",
id="this_month",
),
pytest.param(
"last_month",
"2026-02-01T00:00:00Z",
"2026-03-01T00:00:00Z",
id="last_month",
),
pytest.param(
"this_year",
"2026-01-01T00:00:00Z",
"2027-01-01T00:00:00Z",
id="this_year",
),
pytest.param(
"last_year",
"2025-01-01T00:00:00Z",
"2026-01-01T00:00:00Z",
id="last_year",
),
],
)
@time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False)
def test_datetime_keywords_utc(
self,
keyword: str,
expected_lo: str,
expected_hi: str,
) -> None:
# 2026-03-28 is Saturday; weekday()==5 so Monday=2026-03-23
lo, hi = _range(rewrite_natural_date_keywords(f"added:{keyword}", UTC), "added")
assert lo == expected_lo
assert hi == expected_hi
@time_machine.travel(datetime(2026, 12, 15, 12, 0, tzinfo=UTC), tick=False)
def test_this_month_december_wraps_to_next_year(self) -> None:
# December: next month wraps to January of next year
lo, hi = _range(rewrite_natural_date_keywords("added:this_month", UTC), "added")
assert lo == "2026-12-01T00:00:00Z"
assert hi == "2027-01-01T00:00:00Z"
@time_machine.travel(datetime(2026, 1, 15, 12, 0, tzinfo=UTC), tick=False)
def test_last_month_january_wraps_to_previous_year(self) -> None:
# January: last month wraps back to December of previous year
lo, hi = _range(rewrite_natural_date_keywords("added:last_month", UTC), "added")
assert lo == "2025-12-01T00:00:00Z"
assert hi == "2026-01-01T00:00:00Z"
def test_unknown_keyword_raises(self) -> None:
with pytest.raises(ValueError, match="Unknown keyword"):
_datetime_range("bogus_keyword", UTC)
class TestWhooshQueryRewriting:
"""All Whoosh query syntax variants must be rewritten to ISO 8601 before Tantivy parses them."""
@time_machine.travel(datetime(2026, 3, 28, 15, 0, tzinfo=UTC), tick=False)
def test_compact_date_shim_rewrites_to_iso(self) -> None:
result = rewrite_natural_date_keywords("created:20240115120000", UTC)
assert "2024-01-15" in result
assert "20240115120000" not in result
@time_machine.travel(datetime(2026, 3, 28, 15, 0, tzinfo=UTC), tick=False)
def test_relative_range_shim_removes_now(self) -> None:
result = rewrite_natural_date_keywords("added:[now-7d TO now]", UTC)
assert "now" not in result
assert "2026-03-" in result
@time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False)
def test_bracket_minus_7_days(self) -> None:
lo, hi = _range(
rewrite_natural_date_keywords("added:[-7 days to now]", UTC),
"added",
)
assert lo == "2026-03-21T12:00:00Z"
assert hi == "2026-03-28T12:00:00Z"
@time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False)
def test_bracket_minus_1_week(self) -> None:
lo, hi = _range(
rewrite_natural_date_keywords("added:[-1 week to now]", UTC),
"added",
)
assert lo == "2026-03-21T12:00:00Z"
assert hi == "2026-03-28T12:00:00Z"
@time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False)
def test_bracket_minus_1_month_uses_relativedelta(self) -> None:
# relativedelta(months=1) from 2026-03-28 = 2026-02-28 (not 29)
lo, hi = _range(
rewrite_natural_date_keywords("created:[-1 month to now]", UTC),
"created",
)
assert lo == "2026-02-28T12:00:00Z"
assert hi == "2026-03-28T12:00:00Z"
@time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False)
def test_bracket_minus_1_year(self) -> None:
lo, hi = _range(
rewrite_natural_date_keywords("modified:[-1 year to now]", UTC),
"modified",
)
assert lo == "2025-03-28T12:00:00Z"
assert hi == "2026-03-28T12:00:00Z"
@time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False)
def test_bracket_plural_unit_hours(self) -> None:
lo, hi = _range(
rewrite_natural_date_keywords("added:[-3 hours to now]", UTC),
"added",
)
assert lo == "2026-03-28T09:00:00Z"
assert hi == "2026-03-28T12:00:00Z"
@time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False)
def test_bracket_case_insensitive(self) -> None:
result = rewrite_natural_date_keywords("added:[-1 WEEK TO NOW]", UTC)
assert "now" not in result.lower()
lo, hi = _range(result, "added")
assert lo == "2026-03-21T12:00:00Z"
assert hi == "2026-03-28T12:00:00Z"
@time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False)
def test_relative_range_swaps_bounds_when_lo_exceeds_hi(self) -> None:
# [now+1h TO now-1h] has lo > hi before substitution; they must be swapped
lo, hi = _range(
rewrite_natural_date_keywords("added:[now+1h TO now-1h]", UTC),
"added",
)
assert lo == "2026-03-28T11:00:00Z"
assert hi == "2026-03-28T13:00:00Z"
def test_8digit_created_date_field_always_uses_utc_midnight(self) -> None:
# created is a DateField: boundaries are always UTC midnight, no TZ offset
result = rewrite_natural_date_keywords("created:20231201", EASTERN)
lo, hi = _range(result, "created")
assert lo == "2023-12-01T00:00:00Z"
assert hi == "2023-12-02T00:00:00Z"
def test_8digit_added_datetime_field_converts_local_midnight_to_utc(self) -> None:
# added is DateTimeField: midnight Dec 1 Eastern (EST = UTC-5) = 05:00 UTC
result = rewrite_natural_date_keywords("added:20231201", EASTERN)
lo, hi = _range(result, "added")
assert lo == "2023-12-01T05:00:00Z"
assert hi == "2023-12-02T05:00:00Z"
def test_8digit_modified_datetime_field_converts_local_midnight_to_utc(
self,
) -> None:
result = rewrite_natural_date_keywords("modified:20231201", EASTERN)
lo, hi = _range(result, "modified")
assert lo == "2023-12-01T05:00:00Z"
assert hi == "2023-12-02T05:00:00Z"
def test_8digit_invalid_date_passes_through_unchanged(self) -> None:
assert rewrite_natural_date_keywords("added:20231340", UTC) == "added:20231340"
def test_compact_14digit_invalid_date_passes_through_unchanged(self) -> None:
# Month=13 makes datetime() raise ValueError; the token must be left as-is
assert _rewrite_compact_date("20231300120000") == "20231300120000"
class TestParseUserQuery:
"""parse_user_query runs the full preprocessing pipeline."""
@pytest.fixture
def query_index(self) -> tantivy.Index:
schema = build_schema()
idx = tantivy.Index(schema, path=None)
register_tokenizers(idx, "")
return idx
def test_returns_tantivy_query(self, query_index: tantivy.Index) -> None:
assert isinstance(parse_user_query(query_index, "invoice", UTC), tantivy.Query)
def test_fuzzy_mode_does_not_raise(
self,
query_index: tantivy.Index,
settings,
) -> None:
settings.ADVANCED_FUZZY_SEARCH_THRESHOLD = 0.5
assert isinstance(parse_user_query(query_index, "invoice", UTC), tantivy.Query)
def test_date_rewriting_applied_before_tantivy_parse(
self,
query_index: tantivy.Index,
) -> None:
# created:today must be rewritten to an ISO range before Tantivy parses it;
# if passed raw, Tantivy would reject "today" as an invalid date value
with time_machine.travel(datetime(2026, 3, 28, 12, 0, tzinfo=UTC), tick=False):
q = parse_user_query(query_index, "created:today", UTC)
assert isinstance(q, tantivy.Query)
class TestPassthrough:
"""Queries without field prefixes or unrelated content pass through unchanged."""
def test_bare_keyword_no_field_prefix_unchanged(self) -> None:
# Bare 'today' with no field: prefix passes through unchanged
result = rewrite_natural_date_keywords("bank statement today", UTC)
assert "today" in result
def test_unrelated_query_unchanged(self) -> None:
assert rewrite_natural_date_keywords("title:invoice", UTC) == "title:invoice"
class TestNormalizeQuery:
"""normalize_query expands comma-separated values and collapses whitespace."""
def test_normalize_expands_comma_separated_tags(self) -> None:
assert normalize_query("tag:foo,bar") == "tag:foo AND tag:bar"
def test_normalize_expands_three_values(self) -> None:
assert normalize_query("tag:foo,bar,baz") == "tag:foo AND tag:bar AND tag:baz"
def test_normalize_collapses_whitespace(self) -> None:
assert normalize_query("bank statement") == "bank statement"
def test_normalize_no_commas_unchanged(self) -> None:
assert normalize_query("bank statement") == "bank statement"
class TestPermissionFilter:
"""
build_permission_filter tests use an in-memory index — no DB access needed.
Users are constructed as unsaved model instances (django_user_model(pk=N))
so no database round-trip occurs; only .pk is read by build_permission_filter.
"""
@pytest.fixture
def perm_index(self) -> tantivy.Index:
schema = build_schema()
idx = tantivy.Index(schema, path=None)
register_tokenizers(idx, "")
return idx
def _add_doc(
self,
idx: tantivy.Index,
doc_id: int,
owner_id: int | None = None,
viewer_ids: tuple[int, ...] = (),
) -> None:
writer = idx.writer()
doc = tantivy.Document()
doc.add_unsigned("id", doc_id)
# Only add owner_id field if the document has an owner
if owner_id is not None:
doc.add_unsigned("owner_id", owner_id)
for vid in viewer_ids:
doc.add_unsigned("viewer_id", vid)
writer.add_document(doc)
writer.commit()
idx.reload()
def test_perm_no_owner_visible_to_any_user(
self,
perm_index: tantivy.Index,
django_user_model: type[AbstractBaseUser],
) -> None:
"""Documents with no owner must be visible to every user."""
self._add_doc(perm_index, doc_id=1, owner_id=None)
user = django_user_model(pk=99)
perm = build_permission_filter(perm_index.schema, user)
assert perm_index.searcher().search(perm, limit=10).count == 1
def test_perm_owned_by_user_is_visible(
self,
perm_index: tantivy.Index,
django_user_model: type[AbstractBaseUser],
) -> None:
"""A document owned by the requesting user must be visible."""
self._add_doc(perm_index, doc_id=2, owner_id=42)
user = django_user_model(pk=42)
perm = build_permission_filter(perm_index.schema, user)
assert perm_index.searcher().search(perm, limit=10).count == 1
def test_perm_owned_by_other_not_visible(
self,
perm_index: tantivy.Index,
django_user_model: type[AbstractBaseUser],
) -> None:
"""A document owned by a different user must not be visible."""
self._add_doc(perm_index, doc_id=3, owner_id=42)
user = django_user_model(pk=99)
perm = build_permission_filter(perm_index.schema, user)
assert perm_index.searcher().search(perm, limit=10).count == 0
def test_perm_shared_viewer_is_visible(
self,
perm_index: tantivy.Index,
django_user_model: type[AbstractBaseUser],
) -> None:
"""A document explicitly shared with a user must be visible to that user."""
self._add_doc(perm_index, doc_id=4, owner_id=42, viewer_ids=(99,))
user = django_user_model(pk=99)
perm = build_permission_filter(perm_index.schema, user)
assert perm_index.searcher().search(perm, limit=10).count == 1
def test_perm_only_owned_docs_hidden_from_others(
self,
perm_index: tantivy.Index,
django_user_model: type[AbstractBaseUser],
) -> None:
"""Only unowned documents appear when the user owns none of them."""
self._add_doc(perm_index, doc_id=5, owner_id=10) # owned by 10
self._add_doc(perm_index, doc_id=6, owner_id=None) # unowned
user = django_user_model(pk=20)
perm = build_permission_filter(perm_index.schema, user)
assert perm_index.searcher().search(perm, limit=10).count == 1 # only unowned
+63
View File
@@ -0,0 +1,63 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from documents.search._schema import SCHEMA_VERSION
from documents.search._schema import needs_rebuild
if TYPE_CHECKING:
from pathlib import Path
from pytest_django.fixtures import SettingsWrapper
pytestmark = pytest.mark.search
class TestNeedsRebuild:
"""needs_rebuild covers all sentinel-file states that require a full reindex."""
def test_returns_true_when_version_file_missing(self, index_dir: Path) -> None:
assert needs_rebuild(index_dir) is True
def test_returns_false_when_version_and_language_match(
self,
index_dir: Path,
settings: SettingsWrapper,
) -> None:
settings.SEARCH_LANGUAGE = "en"
(index_dir / ".schema_version").write_text(str(SCHEMA_VERSION))
(index_dir / ".schema_language").write_text("en")
assert needs_rebuild(index_dir) is False
def test_returns_true_on_schema_version_mismatch(self, index_dir: Path) -> None:
(index_dir / ".schema_version").write_text(str(SCHEMA_VERSION - 1))
assert needs_rebuild(index_dir) is True
def test_returns_true_when_version_file_not_an_integer(
self,
index_dir: Path,
) -> None:
(index_dir / ".schema_version").write_text("not-a-number")
assert needs_rebuild(index_dir) is True
def test_returns_true_when_language_sentinel_missing(
self,
index_dir: Path,
settings: SettingsWrapper,
) -> None:
settings.SEARCH_LANGUAGE = "en"
(index_dir / ".schema_version").write_text(str(SCHEMA_VERSION))
# .schema_language intentionally absent
assert needs_rebuild(index_dir) is True
def test_returns_true_when_language_sentinel_content_differs(
self,
index_dir: Path,
settings: SettingsWrapper,
) -> None:
settings.SEARCH_LANGUAGE = "de"
(index_dir / ".schema_version").write_text(str(SCHEMA_VERSION))
(index_dir / ".schema_language").write_text("en")
assert needs_rebuild(index_dir) is True
@@ -0,0 +1,78 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import pytest
import tantivy
from documents.search._tokenizer import _bigram_analyzer
from documents.search._tokenizer import _paperless_text
from documents.search._tokenizer import register_tokenizers
if TYPE_CHECKING:
from _pytest.logging import LogCaptureFixture
pytestmark = pytest.mark.search
class TestTokenizers:
@pytest.fixture
def content_index(self) -> tantivy.Index:
"""Index with just a content field for ASCII folding tests."""
sb = tantivy.SchemaBuilder()
sb.add_text_field("content", stored=True, tokenizer_name="paperless_text")
schema = sb.build()
idx = tantivy.Index(schema, path=None)
idx.register_tokenizer("paperless_text", _paperless_text(""))
return idx
@pytest.fixture
def bigram_index(self) -> tantivy.Index:
"""Index with bigram field for CJK tests."""
sb = tantivy.SchemaBuilder()
sb.add_text_field(
"bigram_content",
stored=False,
tokenizer_name="bigram_analyzer",
)
schema = sb.build()
idx = tantivy.Index(schema, path=None)
idx.register_tokenizer("bigram_analyzer", _bigram_analyzer())
return idx
def test_ascii_fold_finds_accented_content(
self,
content_index: tantivy.Index,
) -> None:
"""ASCII folding allows searching accented text with plain ASCII queries."""
writer = content_index.writer()
doc = tantivy.Document()
doc.add_text("content", "café résumé")
writer.add_document(doc)
writer.commit()
content_index.reload()
q = content_index.parse_query("cafe resume", ["content"])
assert content_index.searcher().search(q, limit=5).count == 1
def test_bigram_finds_cjk_substring(self, bigram_index: tantivy.Index) -> None:
"""Bigram tokenizer enables substring search in CJK languages without whitespace delimiters."""
writer = bigram_index.writer()
doc = tantivy.Document()
doc.add_text("bigram_content", "東京都")
writer.add_document(doc)
writer.commit()
bigram_index.reload()
q = bigram_index.parse_query("東京", ["bigram_content"])
assert bigram_index.searcher().search(q, limit=5).count == 1
def test_unsupported_language_logs_warning(self, caplog: LogCaptureFixture) -> None:
"""Unsupported language codes should log a warning and disable stemming gracefully."""
sb = tantivy.SchemaBuilder()
sb.add_text_field("content", stored=True, tokenizer_name="paperless_text")
schema = sb.build()
idx = tantivy.Index(schema, path=None)
with caplog.at_level(logging.WARNING, logger="paperless.search"):
register_tokenizers(idx, "klingon")
assert "klingon" in caplog.text
+26 -7
View File
@@ -1,6 +1,7 @@
import types
from unittest.mock import patch
import tantivy
from django.contrib.admin.sites import AdminSite
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
@@ -8,36 +9,54 @@ from django.test import TestCase
from django.utils import timezone
from rest_framework import status
from documents import index
from documents.admin import DocumentAdmin
from documents.admin import TagAdmin
from documents.models import Document
from documents.models import Tag
from documents.search import get_backend
from documents.search import reset_backend
from documents.tests.utils import DirectoriesMixin
from paperless.admin import PaperlessUserAdmin
class TestDocumentAdmin(DirectoriesMixin, TestCase):
def get_document_from_index(self, doc):
ix = index.open_index()
with ix.searcher() as searcher:
return searcher.document(id=doc.id)
backend = get_backend()
searcher = backend._index.searcher()
results = searcher.search(
tantivy.Query.range_query(
backend._schema,
"id",
tantivy.FieldType.Unsigned,
doc.pk,
doc.pk,
),
limit=1,
)
if results.hits:
return searcher.doc(results.hits[0][1]).to_dict()
return None
def setUp(self) -> None:
super().setUp()
reset_backend()
self.doc_admin = DocumentAdmin(model=Document, admin_site=AdminSite())
def tearDown(self) -> None:
reset_backend()
super().tearDown()
def test_save_model(self) -> None:
doc = Document.objects.create(title="test")
doc.title = "new title"
self.doc_admin.save_model(None, doc, None, None)
self.assertEqual(Document.objects.get(id=doc.id).title, "new title")
self.assertEqual(self.get_document_from_index(doc)["id"], doc.id)
self.assertEqual(self.get_document_from_index(doc)["id"], [doc.id])
def test_delete_model(self) -> None:
doc = Document.objects.create(title="test")
index.add_or_update_document(doc)
get_backend().add_or_update(doc)
self.assertIsNotNone(self.get_document_from_index(doc))
self.doc_admin.delete_model(None, doc)
@@ -53,7 +72,7 @@ class TestDocumentAdmin(DirectoriesMixin, TestCase):
checksum=f"{i:02}",
)
docs.append(doc)
index.add_or_update_document(doc)
get_backend().add_or_update(doc)
self.assertEqual(Document.objects.count(), 42)
@@ -109,7 +109,7 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
mime_type="application/pdf",
)
with mock.patch("documents.index.remove_document_from_index"):
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(f"/api/documents/{root.id}/versions/{root.id}/")
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
@@ -137,10 +137,7 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
content="v2-content",
)
with (
mock.patch("documents.index.remove_document_from_index"),
mock.patch("documents.index.add_or_update_document"),
):
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(f"/api/documents/{root.id}/versions/{v2.id}/")
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@@ -149,10 +146,7 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
root.refresh_from_db()
self.assertEqual(root.content, "root-content")
with (
mock.patch("documents.index.remove_document_from_index"),
mock.patch("documents.index.add_or_update_document"),
):
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(f"/api/documents/{root.id}/versions/{v1.id}/")
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@@ -175,10 +169,7 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
)
version_id = version.id
with (
mock.patch("documents.index.remove_document_from_index"),
mock.patch("documents.index.add_or_update_document"),
):
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{version_id}/",
)
@@ -225,7 +216,7 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
root_document=other_root,
)
with mock.patch("documents.index.remove_document_from_index"):
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{other_version.id}/",
)
@@ -245,10 +236,7 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
root_document=root,
)
with (
mock.patch("documents.index.remove_document_from_index"),
mock.patch("documents.index.add_or_update_document"),
):
with mock.patch("documents.search.get_backend"):
resp = self.client.delete(
f"/api/documents/{version.id}/versions/{version.id}/",
)
@@ -275,18 +263,17 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
root_document=root,
)
with (
mock.patch("documents.index.remove_document_from_index") as remove_index,
mock.patch("documents.index.add_or_update_document") as add_or_update,
):
with mock.patch("documents.search.get_backend") as mock_get_backend:
mock_backend = mock.MagicMock()
mock_get_backend.return_value = mock_backend
resp = self.client.delete(
f"/api/documents/{root.id}/versions/{version.id}/",
)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
remove_index.assert_called_once_with(version)
add_or_update.assert_called_once()
self.assertEqual(add_or_update.call_args[0][0].id, root.id)
mock_backend.remove.assert_called_once_with(version.pk)
mock_backend.add_or_update.assert_called_once()
self.assertEqual(mock_backend.add_or_update.call_args[0][0].id, root.id)
def test_delete_version_returns_403_without_permission(self) -> None:
owner = User.objects.create_user(username="owner")
+204 -217
View File
@@ -2,6 +2,7 @@ import datetime
from datetime import timedelta
from unittest import mock
import pytest
from dateutil.relativedelta import relativedelta
from django.contrib.auth.models import Group
from django.contrib.auth.models import Permission
@@ -11,9 +12,7 @@ from django.utils import timezone
from guardian.shortcuts import assign_perm
from rest_framework import status
from rest_framework.test import APITestCase
from whoosh.writing import AsyncWriter
from documents import index
from documents.bulk_edit import set_permissions
from documents.models import Correspondent
from documents.models import CustomField
@@ -25,18 +24,27 @@ from documents.models import SavedView
from documents.models import StoragePath
from documents.models import Tag
from documents.models import Workflow
from documents.search import get_backend
from documents.search import reset_backend
from documents.tests.utils import DirectoriesMixin
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
pytestmark = pytest.mark.search
class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
def setUp(self) -> None:
super().setUp()
reset_backend()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def tearDown(self) -> None:
reset_backend()
super().tearDown()
def test_search(self) -> None:
d1 = Document.objects.create(
title="invoice",
@@ -57,13 +65,11 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
checksum="C",
original_filename="someepdf.pdf",
)
with AsyncWriter(index.open_index()) as writer:
# Note to future self: there is a reason we dont use a model signal handler to update the index: some operations edit many documents at once
# (retagger, renamer) and we don't want to open a writer for each of these, but rather perform the entire operation with one writer.
# That's why we can't open the writer in a model on_save handler or something.
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get("/api/documents/?query=bank")
results = response.data["results"]
self.assertEqual(response.data["count"], 3)
@@ -98,9 +104,9 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
checksum="B",
pk=2,
)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
response = self.client.get(
"/api/documents/?query=bank",
@@ -127,8 +133,7 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
)
matching_doc.tags.add(tag)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, matching_doc)
get_backend().add_or_update(matching_doc)
response = self.client.get(
"/api/documents/?query=bank&include_selection_data=true",
@@ -187,10 +192,10 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
value_int=20,
)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get(
f"/api/documents/?query=match&ordering=custom_field_{custom_field.pk}",
@@ -211,15 +216,15 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
)
def test_search_multi_page(self) -> None:
with AsyncWriter(index.open_index()) as writer:
for i in range(55):
doc = Document.objects.create(
checksum=str(i),
pk=i + 1,
title=f"Document {i + 1}",
content="content",
)
index.update_document(writer, doc)
backend = get_backend()
for i in range(55):
doc = Document.objects.create(
checksum=str(i),
pk=i + 1,
title=f"Document {i + 1}",
content="content",
)
backend.add_or_update(doc)
# This is here so that we test that no document gets returned twice (might happen if the paging is not working)
seen_ids = []
@@ -246,15 +251,15 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
seen_ids.append(result["id"])
def test_search_invalid_page(self) -> None:
with AsyncWriter(index.open_index()) as writer:
for i in range(15):
doc = Document.objects.create(
checksum=str(i),
pk=i + 1,
title=f"Document {i + 1}",
content="content",
)
index.update_document(writer, doc)
backend = get_backend()
for i in range(15):
doc = Document.objects.create(
checksum=str(i),
pk=i + 1,
title=f"Document {i + 1}",
content="content",
)
backend.add_or_update(doc)
response = self.client.get("/api/documents/?query=content&page=0&page_size=10")
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@@ -292,26 +297,25 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
pk=3,
checksum="C",
)
with index.open_index_writer() as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get("/api/documents/?query=added:[-1 week to now]")
results = response.data["results"]
# Expect 3 documents returned
self.assertEqual(len(results), 3)
for idx, subset in enumerate(
[
{"id": 1, "title": "invoice"},
{"id": 2, "title": "bank statement 1"},
{"id": 3, "title": "bank statement 3"},
],
):
result = results[idx]
# Assert subset in results
self.assertDictEqual(result, {**result, **subset})
result_map = {r["id"]: r for r in results}
self.assertEqual(set(result_map.keys()), {1, 2, 3})
for subset in [
{"id": 1, "title": "invoice"},
{"id": 2, "title": "bank statement 1"},
{"id": 3, "title": "bank statement 3"},
]:
r = result_map[subset["id"]]
self.assertDictEqual(r, {**r, **subset})
@override_settings(
TIME_ZONE="America/Chicago",
@@ -347,10 +351,10 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
# 7 days, 1 hour and 1 minute ago
added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
)
with index.open_index_writer() as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get("/api/documents/?query=added:[-1 week to now]")
results = response.data["results"]
@@ -358,12 +362,14 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
# Expect 2 documents returned
self.assertEqual(len(results), 2)
for idx, subset in enumerate(
[{"id": 1, "title": "invoice"}, {"id": 2, "title": "bank statement 1"}],
):
result = results[idx]
# Assert subset in results
self.assertDictEqual(result, {**result, **subset})
result_map = {r["id"]: r for r in results}
self.assertEqual(set(result_map.keys()), {1, 2})
for subset in [
{"id": 1, "title": "invoice"},
{"id": 2, "title": "bank statement 1"},
]:
r = result_map[subset["id"]]
self.assertDictEqual(r, {**r, **subset})
@override_settings(
TIME_ZONE="Europe/Sofia",
@@ -399,10 +405,10 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
# 7 days, 1 hour and 1 minute ago
added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
)
with index.open_index_writer() as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get("/api/documents/?query=added:[-1 week to now]")
results = response.data["results"]
@@ -410,12 +416,14 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
# Expect 2 documents returned
self.assertEqual(len(results), 2)
for idx, subset in enumerate(
[{"id": 1, "title": "invoice"}, {"id": 2, "title": "bank statement 1"}],
):
result = results[idx]
# Assert subset in results
self.assertDictEqual(result, {**result, **subset})
result_map = {r["id"]: r for r in results}
self.assertEqual(set(result_map.keys()), {1, 2})
for subset in [
{"id": 1, "title": "invoice"},
{"id": 2, "title": "bank statement 1"},
]:
r = result_map[subset["id"]]
self.assertDictEqual(r, {**r, **subset})
def test_search_added_in_last_month(self) -> None:
"""
@@ -451,10 +459,10 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
)
with index.open_index_writer() as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get("/api/documents/?query=added:[-1 month to now]")
results = response.data["results"]
@@ -462,12 +470,14 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
# Expect 2 documents returned
self.assertEqual(len(results), 2)
for idx, subset in enumerate(
[{"id": 1, "title": "invoice"}, {"id": 3, "title": "bank statement 3"}],
):
result = results[idx]
# Assert subset in results
self.assertDictEqual(result, {**result, **subset})
result_map = {r["id"]: r for r in results}
self.assertEqual(set(result_map.keys()), {1, 3})
for subset in [
{"id": 1, "title": "invoice"},
{"id": 3, "title": "bank statement 3"},
]:
r = result_map[subset["id"]]
self.assertDictEqual(r, {**r, **subset})
@override_settings(
TIME_ZONE="America/Denver",
@@ -507,10 +517,10 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
)
with index.open_index_writer() as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get("/api/documents/?query=added:[-1 month to now]")
results = response.data["results"]
@@ -518,12 +528,14 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
# Expect 2 documents returned
self.assertEqual(len(results), 2)
for idx, subset in enumerate(
[{"id": 1, "title": "invoice"}, {"id": 3, "title": "bank statement 3"}],
):
result = results[idx]
# Assert subset in results
self.assertDictEqual(result, {**result, **subset})
result_map = {r["id"]: r for r in results}
self.assertEqual(set(result_map.keys()), {1, 3})
for subset in [
{"id": 1, "title": "invoice"},
{"id": 3, "title": "bank statement 3"},
]:
r = result_map[subset["id"]]
self.assertDictEqual(r, {**r, **subset})
@override_settings(
TIME_ZONE="Europe/Sofia",
@@ -563,10 +575,10 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
# Django converts dates to UTC
d3.refresh_from_db()
with index.open_index_writer() as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get("/api/documents/?query=added:20231201")
results = response.data["results"]
@@ -574,12 +586,8 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
# Expect 1 document returned
self.assertEqual(len(results), 1)
for idx, subset in enumerate(
[{"id": 3, "title": "bank statement 3"}],
):
result = results[idx]
# Assert subset in results
self.assertDictEqual(result, {**result, **subset})
self.assertEqual(results[0]["id"], 3)
self.assertEqual(results[0]["title"], "bank statement 3")
def test_search_added_invalid_date(self) -> None:
"""
@@ -588,7 +596,7 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
WHEN:
- Query with invalid added date
THEN:
- No documents returned
- 400 Bad Request returned (Tantivy rejects invalid date field syntax)
"""
d1 = Document.objects.create(
title="invoice",
@@ -597,16 +605,14 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
pk=1,
)
with index.open_index_writer() as writer:
index.update_document(writer, d1)
get_backend().add_or_update(d1)
response = self.client.get("/api/documents/?query=added:invalid-date")
results = response.data["results"]
# Expect 0 document returned
self.assertEqual(len(results), 0)
# Tantivy rejects unparsable field queries with a 400
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
@mock.patch("documents.index.autocomplete")
@mock.patch("documents.search._backend.TantivyBackend.autocomplete")
def test_search_autocomplete_limits(self, m) -> None:
"""
GIVEN:
@@ -618,7 +624,7 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
- Limit requests are obeyed
"""
m.side_effect = lambda ix, term, limit, user: [term for _ in range(limit)]
m.side_effect = lambda term, limit, user=None: [term for _ in range(limit)]
response = self.client.get("/api/search/autocomplete/?term=test")
self.assertEqual(response.status_code, status.HTTP_200_OK)
@@ -671,32 +677,29 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
owner=u1,
)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
response = self.client.get("/api/search/autocomplete/?term=app")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, [b"apples", b"applebaum", b"appletini"])
self.assertEqual(response.data, ["applebaum", "apples", "appletini"])
d3.owner = u2
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, d3)
d3.save()
backend.add_or_update(d3)
response = self.client.get("/api/search/autocomplete/?term=app")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, [b"apples", b"applebaum"])
self.assertEqual(response.data, ["applebaum", "apples"])
assign_perm("view_document", u1, d3)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, d3)
backend.add_or_update(d3)
response = self.client.get("/api/search/autocomplete/?term=app")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, [b"apples", b"applebaum", b"appletini"])
self.assertEqual(response.data, ["applebaum", "apples", "appletini"])
def test_search_autocomplete_field_name_match(self) -> None:
"""
@@ -714,8 +717,7 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
checksum="1",
)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, d1)
get_backend().add_or_update(d1)
response = self.client.get("/api/search/autocomplete/?term=created:2023")
self.assertEqual(response.status_code, status.HTTP_200_OK)
@@ -736,33 +738,36 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
checksum="1",
)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, d1)
get_backend().add_or_update(d1)
response = self.client.get("/api/search/autocomplete/?term=auto")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data[0], b"auto")
self.assertEqual(response.data[0], "auto")
def test_search_spelling_suggestion(self) -> None:
with AsyncWriter(index.open_index()) as writer:
for i in range(55):
doc = Document.objects.create(
checksum=str(i),
pk=i + 1,
title=f"Document {i + 1}",
content=f"Things document {i + 1}",
)
index.update_document(writer, doc)
def test_search_no_spelling_suggestion(self) -> None:
"""
GIVEN:
- Documents exist with various terms
WHEN:
- Query for documents with any term
THEN:
- corrected_query is always None (Tantivy has no spell correction)
"""
backend = get_backend()
for i in range(5):
doc = Document.objects.create(
checksum=str(i),
pk=i + 1,
title=f"Document {i + 1}",
content=f"Things document {i + 1}",
)
backend.add_or_update(doc)
response = self.client.get("/api/documents/?query=thing")
correction = response.data["corrected_query"]
self.assertEqual(correction, "things")
self.assertIsNone(response.data["corrected_query"])
response = self.client.get("/api/documents/?query=things")
correction = response.data["corrected_query"]
self.assertEqual(correction, None)
self.assertIsNone(response.data["corrected_query"])
def test_search_spelling_suggestion_suppressed_for_private_terms(self):
owner = User.objects.create_user("owner")
@@ -771,24 +776,24 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
Permission.objects.get(codename="view_document"),
)
with AsyncWriter(index.open_index()) as writer:
for i in range(55):
private_doc = Document.objects.create(
checksum=f"p{i}",
pk=100 + i,
title=f"Private Document {i + 1}",
content=f"treasury document {i + 1}",
owner=owner,
)
visible_doc = Document.objects.create(
checksum=f"v{i}",
pk=200 + i,
title=f"Visible Document {i + 1}",
content=f"public ledger {i + 1}",
owner=attacker,
)
index.update_document(writer, private_doc)
index.update_document(writer, visible_doc)
backend = get_backend()
for i in range(5):
private_doc = Document.objects.create(
checksum=f"p{i}",
pk=100 + i,
title=f"Private Document {i + 1}",
content=f"treasury document {i + 1}",
owner=owner,
)
visible_doc = Document.objects.create(
checksum=f"v{i}",
pk=200 + i,
title=f"Visible Document {i + 1}",
content=f"public ledger {i + 1}",
owner=attacker,
)
backend.add_or_update(private_doc)
backend.add_or_update(visible_doc)
self.client.force_authenticate(user=attacker)
@@ -798,26 +803,6 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
self.assertEqual(response.data["count"], 0)
self.assertIsNone(response.data["corrected_query"])
@mock.patch(
"whoosh.searching.Searcher.correct_query",
side_effect=Exception("Test error"),
)
def test_corrected_query_error(self, mock_correct_query) -> None:
"""
GIVEN:
- A query that raises an error on correction
WHEN:
- API request for search with that query
THEN:
- The error is logged and the search proceeds
"""
with self.assertLogs("paperless.index", level="INFO") as cm:
response = self.client.get("/api/documents/?query=2025-06-04")
self.assertEqual(response.status_code, status.HTTP_200_OK)
error_str = cm.output[0]
expected_str = "Error while correcting query '2025-06-04': Test error"
self.assertIn(expected_str, error_str)
def test_search_more_like(self) -> None:
"""
GIVEN:
@@ -847,16 +832,16 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
checksum="C",
)
d4 = Document.objects.create(
title="Monty Python & the Holy Grail",
content="And now for something completely different",
title="Quarterly Report",
content="quarterly revenue profit margin earnings growth",
pk=4,
checksum="ABC",
)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
index.update_document(writer, d4)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
backend.add_or_update(d4)
response = self.client.get(f"/api/documents/?more_like_id={d2.id}")
@@ -864,9 +849,10 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
results = response.data["results"]
self.assertEqual(len(results), 2)
self.assertEqual(results[0]["id"], d3.id)
self.assertEqual(results[1]["id"], d1.id)
self.assertGreaterEqual(len(results), 1)
result_ids = [r["id"] for r in results]
self.assertIn(d3.id, result_ids)
self.assertNotIn(d4.id, result_ids)
def test_search_more_like_requires_view_permission_on_seed_document(
self,
@@ -908,10 +894,10 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
pk=12,
)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, private_seed)
index.update_document(writer, visible_doc)
index.update_document(writer, other_doc)
backend = get_backend()
backend.add_or_update(private_seed)
backend.add_or_update(visible_doc)
backend.add_or_update(other_doc)
self.client.force_authenticate(user=attacker)
@@ -985,9 +971,9 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
value_text="foobard4",
)
with AsyncWriter(index.open_index()) as writer:
for doc in Document.objects.all():
index.update_document(writer, doc)
backend = get_backend()
for doc in Document.objects.all():
backend.add_or_update(doc)
def search_query(q):
r = self.client.get("/api/documents/?query=test" + q)
@@ -1203,9 +1189,9 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
Document.objects.create(checksum="3", content="test 3", owner=u2)
Document.objects.create(checksum="4", content="test 4")
with AsyncWriter(index.open_index()) as writer:
for doc in Document.objects.all():
index.update_document(writer, doc)
backend = get_backend()
for doc in Document.objects.all():
backend.add_or_update(doc)
self.client.force_authenticate(user=u1)
r = self.client.get("/api/documents/?query=test")
@@ -1256,9 +1242,9 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
d3 = Document.objects.create(checksum="3", content="test 3", owner=u2)
Document.objects.create(checksum="4", content="test 4")
with AsyncWriter(index.open_index()) as writer:
for doc in Document.objects.all():
index.update_document(writer, doc)
backend = get_backend()
for doc in Document.objects.all():
backend.add_or_update(doc)
self.client.force_authenticate(user=u1)
r = self.client.get("/api/documents/?query=test")
@@ -1278,9 +1264,9 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
assign_perm("view_document", u1, d3)
assign_perm("view_document", u2, d1)
with AsyncWriter(index.open_index()) as writer:
for doc in [d1, d2, d3]:
index.update_document(writer, doc)
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
self.client.force_authenticate(user=u1)
r = self.client.get("/api/documents/?query=test")
@@ -1343,9 +1329,9 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
user=u1,
)
with AsyncWriter(index.open_index()) as writer:
for doc in Document.objects.all():
index.update_document(writer, doc)
backend = get_backend()
for doc in Document.objects.all():
backend.add_or_update(doc)
def search_query(q):
r = self.client.get("/api/documents/?query=test" + q)
@@ -1378,13 +1364,14 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
search_query("&ordering=-num_notes"),
[d1.id, d3.id, d2.id],
)
# owner sort: ORM orders by owner_id (integer); NULLs first in SQLite ASC
self.assertListEqual(
search_query("&ordering=owner"),
[d1.id, d2.id, d3.id],
[d3.id, d1.id, d2.id],
)
self.assertListEqual(
search_query("&ordering=-owner"),
[d3.id, d2.id, d1.id],
[d2.id, d1.id, d3.id],
)
@mock.patch("documents.bulk_edit.bulk_update_documents")
@@ -1441,12 +1428,12 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
)
set_permissions([4, 5], set_permissions={}, owner=user2, merge=False)
with index.open_index_writer() as writer:
index.update_document(writer, d1)
index.update_document(writer, d2)
index.update_document(writer, d3)
index.update_document(writer, d4)
index.update_document(writer, d5)
backend = get_backend()
backend.add_or_update(d1)
backend.add_or_update(d2)
backend.add_or_update(d3)
backend.add_or_update(d4)
backend.add_or_update(d5)
correspondent1 = Correspondent.objects.create(name="bank correspondent 1")
Correspondent.objects.create(name="correspondent 2")
+16 -14
View File
@@ -191,40 +191,42 @@ class TestSystemStatus(APITestCase):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["tasks"]["celery_status"], "OK")
@override_settings(INDEX_DIR=Path("/tmp/index"))
@mock.patch("whoosh.index.FileIndex.last_modified")
def test_system_status_index_ok(self, mock_last_modified) -> None:
@mock.patch("documents.search.get_backend")
def test_system_status_index_ok(self, mock_get_backend) -> None:
"""
GIVEN:
- The index last modified time is set
- The index is accessible
WHEN:
- The user requests the system status
THEN:
- The response contains the correct index status
"""
mock_last_modified.return_value = 1707839087
self.client.force_login(self.user)
response = self.client.get(self.ENDPOINT)
mock_get_backend.return_value = mock.MagicMock()
# Use the temp dir created in setUp (self.tmp_dir) as a real INDEX_DIR
# with a real file so the mtime lookup works
sentinel = self.tmp_dir / "sentinel.txt"
sentinel.write_text("ok")
with self.settings(INDEX_DIR=self.tmp_dir):
self.client.force_login(self.user)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["tasks"]["index_status"], "OK")
self.assertIsNotNone(response.data["tasks"]["index_last_modified"])
@override_settings(INDEX_DIR=Path("/tmp/index/"))
@mock.patch("documents.index.open_index", autospec=True)
def test_system_status_index_error(self, mock_open_index) -> None:
@mock.patch("documents.search.get_backend")
def test_system_status_index_error(self, mock_get_backend) -> None:
"""
GIVEN:
- The index is not found
- The index cannot be opened
WHEN:
- The user requests the system status
THEN:
- The response contains the correct index status
"""
mock_open_index.return_value = None
mock_open_index.side_effect = Exception("Index error")
mock_get_backend.side_effect = Exception("Index error")
self.client.force_login(self.user)
response = self.client.get(self.ENDPOINT)
mock_open_index.assert_called_once()
mock_get_backend.assert_called_once()
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["tasks"]["index_status"], "ERROR")
self.assertIsNotNone(response.data["tasks"]["index_error"])
-58
View File
@@ -1,58 +0,0 @@
from django.test import TestCase
from whoosh import query
from documents.index import get_permissions_criterias
from documents.models import User
class TestDelayedQuery(TestCase):
def setUp(self) -> None:
super().setUp()
# all tests run without permission criteria, so has_no_owner query will always
# be appended.
self.has_no_owner = query.Or([query.Term("has_owner", text=False)])
def _get_testset__id__in(self, param, field):
return (
{f"{param}__id__in": "42,43"},
query.And(
[
query.Or(
[
query.Term(f"{field}_id", "42"),
query.Term(f"{field}_id", "43"),
],
),
self.has_no_owner,
],
),
)
def _get_testset__id__none(self, param, field):
return (
{f"{param}__id__none": "42,43"},
query.And(
[
query.Not(query.Term(f"{field}_id", "42")),
query.Not(query.Term(f"{field}_id", "43")),
self.has_no_owner,
],
),
)
def test_get_permission_criteria(self) -> None:
# tests contains tuples of user instances and the expected filter
tests = (
(None, [query.Term("has_owner", text=False)]),
(User(42, username="foo", is_superuser=True), []),
(
User(42, username="foo", is_superuser=False),
[
query.Term("has_owner", text=False),
query.Term("owner_id", 42),
query.Term("viewer_id", "42"),
],
),
)
for user, expected in tests:
self.assertEqual(get_permissions_criterias(user), expected)
-371
View File
@@ -1,371 +0,0 @@
from datetime import datetime
from unittest import mock
from django.conf import settings
from django.contrib.auth.models import User
from django.test import SimpleTestCase
from django.test import TestCase
from django.test import override_settings
from django.utils.timezone import get_current_timezone
from django.utils.timezone import timezone
from documents import index
from documents.models import Document
from documents.tests.utils import DirectoriesMixin
class TestAutoComplete(DirectoriesMixin, TestCase):
def test_auto_complete(self) -> None:
doc1 = Document.objects.create(
title="doc1",
checksum="A",
content="test test2 test3",
)
doc2 = Document.objects.create(title="doc2", checksum="B", content="test test2")
doc3 = Document.objects.create(title="doc3", checksum="C", content="test2")
index.add_or_update_document(doc1)
index.add_or_update_document(doc2)
index.add_or_update_document(doc3)
ix = index.open_index()
self.assertListEqual(
index.autocomplete(ix, "tes"),
[b"test2", b"test", b"test3"],
)
self.assertListEqual(
index.autocomplete(ix, "tes", limit=3),
[b"test2", b"test", b"test3"],
)
self.assertListEqual(index.autocomplete(ix, "tes", limit=1), [b"test2"])
self.assertListEqual(index.autocomplete(ix, "tes", limit=0), [])
def test_archive_serial_number_ranging(self) -> None:
"""
GIVEN:
- Document with an archive serial number above schema allowed size
WHEN:
- Document is provided to the index
THEN:
- Error is logged
- Document ASN is reset to 0 for the index
"""
doc1 = Document.objects.create(
title="doc1",
checksum="A",
content="test test2 test3",
# yes, this is allowed, unless full_clean is run
# DRF does call the validators, this test won't
archive_serial_number=Document.ARCHIVE_SERIAL_NUMBER_MAX + 1,
)
with self.assertLogs("paperless.index", level="ERROR") as cm:
with mock.patch(
"documents.index.AsyncWriter.update_document",
) as mocked_update_doc:
index.add_or_update_document(doc1)
mocked_update_doc.assert_called_once()
_, kwargs = mocked_update_doc.call_args
self.assertEqual(kwargs["asn"], 0)
error_str = cm.output[0]
expected_str = "ERROR:paperless.index:Not indexing Archive Serial Number 4294967296 of document 1"
self.assertIn(expected_str, error_str)
def test_archive_serial_number_is_none(self) -> None:
"""
GIVEN:
- Document with no archive serial number
WHEN:
- Document is provided to the index
THEN:
- ASN isn't touched
"""
doc1 = Document.objects.create(
title="doc1",
checksum="A",
content="test test2 test3",
)
with mock.patch(
"documents.index.AsyncWriter.update_document",
) as mocked_update_doc:
index.add_or_update_document(doc1)
mocked_update_doc.assert_called_once()
_, kwargs = mocked_update_doc.call_args
self.assertIsNone(kwargs["asn"])
@override_settings(TIME_ZONE="Pacific/Auckland")
def test_added_today_respects_local_timezone_boundary(self) -> None:
tz = get_current_timezone()
fixed_now = datetime(2025, 7, 20, 15, 0, 0, tzinfo=tz)
# Fake a time near the local boundary (1 AM NZT = 13:00 UTC on previous UTC day)
local_dt = datetime(2025, 7, 20, 1, 0, 0).replace(tzinfo=tz)
utc_dt = local_dt.astimezone(timezone.utc)
doc = Document.objects.create(
title="Time zone",
content="Testing added:today",
checksum="edgecase123",
added=utc_dt,
)
with index.open_index_writer() as writer:
index.update_document(writer, doc)
superuser = User.objects.create_superuser(username="testuser")
self.client.force_login(superuser)
with mock.patch("documents.index.now", return_value=fixed_now):
response = self.client.get("/api/documents/?query=added:today")
results = response.json()["results"]
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["id"], doc.id)
response = self.client.get("/api/documents/?query=added:yesterday")
results = response.json()["results"]
self.assertEqual(len(results), 0)
@override_settings(TIME_ZONE="UTC")
class TestRewriteNaturalDateKeywords(SimpleTestCase):
"""
Unit tests for rewrite_natural_date_keywords function.
"""
def _rewrite_with_now(self, query: str, now_dt: datetime) -> str:
with mock.patch("documents.index.now", return_value=now_dt):
return index.rewrite_natural_date_keywords(query)
def _assert_rewrite_contains(
self,
query: str,
now_dt: datetime,
*expected_fragments: str,
) -> str:
result = self._rewrite_with_now(query, now_dt)
for fragment in expected_fragments:
self.assertIn(fragment, result)
return result
def test_range_keywords(self) -> None:
"""
Test various different range keywords
"""
cases = [
(
"added:today",
datetime(2025, 7, 20, 15, 30, 45, tzinfo=timezone.utc),
("added:[20250720", "TO 20250720"),
),
(
"added:yesterday",
datetime(2025, 7, 20, 15, 30, 45, tzinfo=timezone.utc),
("added:[20250719", "TO 20250719"),
),
(
"added:this month",
datetime(2025, 7, 15, 12, 0, 0, tzinfo=timezone.utc),
("added:[20250701", "TO 20250731"),
),
(
"added:previous month",
datetime(2025, 7, 15, 12, 0, 0, tzinfo=timezone.utc),
("added:[20250601", "TO 20250630"),
),
(
"added:this year",
datetime(2025, 7, 15, 12, 0, 0, tzinfo=timezone.utc),
("added:[20250101", "TO 20251231"),
),
(
"added:previous year",
datetime(2025, 7, 15, 12, 0, 0, tzinfo=timezone.utc),
("added:[20240101", "TO 20241231"),
),
# Previous quarter from July 15, 2025 is April-June.
(
"added:previous quarter",
datetime(2025, 7, 15, 12, 0, 0, tzinfo=timezone.utc),
("added:[20250401", "TO 20250630"),
),
# July 20, 2025 is a Sunday (weekday 6) so previous week is July 7-13.
(
"added:previous week",
datetime(2025, 7, 20, 12, 0, 0, tzinfo=timezone.utc),
("added:[20250707", "TO 20250713"),
),
]
for query, now_dt, fragments in cases:
with self.subTest(query=query):
self._assert_rewrite_contains(query, now_dt, *fragments)
def test_additional_fields(self) -> None:
fixed_now = datetime(2025, 7, 20, 15, 30, 45, tzinfo=timezone.utc)
# created
self._assert_rewrite_contains("created:today", fixed_now, "created:[20250720")
# modified
self._assert_rewrite_contains("modified:today", fixed_now, "modified:[20250720")
def test_basic_syntax_variants(self) -> None:
"""
Test that quoting, casing, and multi-clause queries are parsed.
"""
fixed_now = datetime(2025, 7, 20, 15, 30, 45, tzinfo=timezone.utc)
# quoted keywords
result1 = self._rewrite_with_now('added:"today"', fixed_now)
result2 = self._rewrite_with_now("added:'today'", fixed_now)
self.assertIn("added:[20250720", result1)
self.assertIn("added:[20250720", result2)
# case insensitivity
for query in ("added:TODAY", "added:Today", "added:ToDaY"):
with self.subTest(case_variant=query):
self._assert_rewrite_contains(query, fixed_now, "added:[20250720")
# multiple clauses
result = self._rewrite_with_now("added:today created:yesterday", fixed_now)
self.assertIn("added:[20250720", result)
self.assertIn("created:[20250719", result)
def test_no_match(self) -> None:
"""
Test that queries without keywords are unchanged.
"""
query = "title:test content:example"
result = index.rewrite_natural_date_keywords(query)
self.assertEqual(query, result)
@override_settings(TIME_ZONE="Pacific/Auckland")
def test_timezone_awareness(self) -> None:
"""
Test timezone conversion.
"""
# July 20, 2025 1:00 AM NZST = July 19, 2025 13:00 UTC
fixed_now = datetime(2025, 7, 20, 1, 0, 0, tzinfo=get_current_timezone())
result = self._rewrite_with_now("added:today", fixed_now)
# Should convert to UTC properly
self.assertIn("added:[20250719", result)
class TestIndexResilience(DirectoriesMixin, SimpleTestCase):
def _assert_recreate_called(self, mock_create_in) -> None:
mock_create_in.assert_called_once()
path_arg, schema_arg = mock_create_in.call_args.args
self.assertEqual(path_arg, settings.INDEX_DIR)
self.assertEqual(schema_arg.__class__.__name__, "Schema")
def test_transient_missing_segment_does_not_force_recreate(self) -> None:
"""
GIVEN:
- Index directory exists
WHEN:
- open_index is called
- Opening the index raises FileNotFoundError once due to a
transient missing segment
THEN:
- Index is opened successfully on retry
- Index is not recreated
"""
file_marker = settings.INDEX_DIR / "file_marker.txt"
file_marker.write_text("keep")
expected_index = object()
with (
mock.patch("documents.index.exists_in", return_value=True),
mock.patch(
"documents.index.open_dir",
side_effect=[FileNotFoundError("missing"), expected_index],
) as mock_open_dir,
mock.patch(
"documents.index.create_in",
) as mock_create_in,
mock.patch(
"documents.index.rmtree",
) as mock_rmtree,
):
ix = index.open_index()
self.assertIs(ix, expected_index)
self.assertGreaterEqual(mock_open_dir.call_count, 2)
mock_rmtree.assert_not_called()
mock_create_in.assert_not_called()
self.assertEqual(file_marker.read_text(), "keep")
def test_transient_errors_exhaust_retries_and_recreate(self) -> None:
"""
GIVEN:
- Index directory exists
WHEN:
- open_index is called
- Opening the index raises FileNotFoundError multiple times due to
transient missing segments
THEN:
- Index is recreated after retries are exhausted
"""
recreated_index = object()
with (
self.assertLogs("paperless.index", level="ERROR") as cm,
mock.patch("documents.index.exists_in", return_value=True),
mock.patch(
"documents.index.open_dir",
side_effect=FileNotFoundError("missing"),
) as mock_open_dir,
mock.patch("documents.index.rmtree") as mock_rmtree,
mock.patch(
"documents.index.create_in",
return_value=recreated_index,
) as mock_create_in,
):
ix = index.open_index()
self.assertIs(ix, recreated_index)
self.assertEqual(mock_open_dir.call_count, 4)
mock_rmtree.assert_called_once_with(settings.INDEX_DIR)
self._assert_recreate_called(mock_create_in)
self.assertIn(
"Error while opening the index after retries, recreating.",
cm.output[0],
)
def test_non_transient_error_recreates_index(self) -> None:
"""
GIVEN:
- Index directory exists
WHEN:
- open_index is called
- Opening the index raises a "non-transient" error
THEN:
- Index is recreated
"""
recreated_index = object()
with (
self.assertLogs("paperless.index", level="ERROR") as cm,
mock.patch("documents.index.exists_in", return_value=True),
mock.patch(
"documents.index.open_dir",
side_effect=RuntimeError("boom"),
),
mock.patch("documents.index.rmtree") as mock_rmtree,
mock.patch(
"documents.index.create_in",
return_value=recreated_index,
) as mock_create_in,
):
ix = index.open_index()
self.assertIs(ix, recreated_index)
mock_rmtree.assert_called_once_with(settings.INDEX_DIR)
self._assert_recreate_called(mock_create_in)
self.assertIn(
"Error while opening the index, recreating.",
cm.output[0],
)
+66 -7
View File
@@ -103,16 +103,75 @@ class TestArchiver(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
@pytest.mark.management
class TestMakeIndex(TestCase):
@mock.patch("documents.management.commands.document_index.index_reindex")
def test_reindex(self, m) -> None:
@pytest.mark.django_db
class TestMakeIndex:
def test_reindex(self, mocker: MockerFixture) -> None:
"""Reindex command must call the backend rebuild method to recreate the index."""
mock_get_backend = mocker.patch(
"documents.management.commands.document_index.get_backend",
)
call_command("document_index", "reindex", skip_checks=True)
m.assert_called_once()
mock_get_backend.return_value.rebuild.assert_called_once()
@mock.patch("documents.management.commands.document_index.index_optimize")
def test_optimize(self, m) -> None:
def test_optimize(self) -> None:
"""Optimize command must execute without error (Tantivy handles optimization automatically)."""
call_command("document_index", "optimize", skip_checks=True)
m.assert_called_once()
def test_reindex_recreate_wipes_index(self, mocker: MockerFixture) -> None:
"""Reindex with --recreate must wipe the index before rebuilding."""
mock_wipe = mocker.patch(
"documents.management.commands.document_index.wipe_index",
)
mock_get_backend = mocker.patch(
"documents.management.commands.document_index.get_backend",
)
call_command("document_index", "reindex", recreate=True, skip_checks=True)
mock_wipe.assert_called_once()
mock_get_backend.return_value.rebuild.assert_called_once()
def test_reindex_without_recreate_does_not_wipe_index(
self,
mocker: MockerFixture,
) -> None:
"""Reindex without --recreate must not wipe the index."""
mock_wipe = mocker.patch(
"documents.management.commands.document_index.wipe_index",
)
mocker.patch(
"documents.management.commands.document_index.get_backend",
)
call_command("document_index", "reindex", skip_checks=True)
mock_wipe.assert_not_called()
def test_reindex_if_needed_skips_when_up_to_date(
self,
mocker: MockerFixture,
) -> None:
"""Conditional reindex must skip rebuild when schema version and language match."""
mocker.patch(
"documents.management.commands.document_index.needs_rebuild",
return_value=False,
)
mock_get_backend = mocker.patch(
"documents.management.commands.document_index.get_backend",
)
call_command("document_index", "reindex", if_needed=True, skip_checks=True)
mock_get_backend.return_value.rebuild.assert_not_called()
def test_reindex_if_needed_runs_when_rebuild_needed(
self,
mocker: MockerFixture,
) -> None:
"""Conditional reindex must proceed with rebuild when schema version or language changed."""
mocker.patch(
"documents.management.commands.document_index.needs_rebuild",
return_value=True,
)
mock_get_backend = mocker.patch(
"documents.management.commands.document_index.get_backend",
)
call_command("document_index", "reindex", if_needed=True, skip_checks=True)
mock_get_backend.return_value.rebuild.assert_called_once()
@pytest.mark.management
+6
View File
@@ -452,7 +452,10 @@ class TestDocumentConsumptionFinishedSignal(TestCase):
"""
def setUp(self) -> None:
from documents.search import reset_backend
TestCase.setUp(self)
reset_backend()
User.objects.create_user(username="test_consumer", password="12345")
self.doc_contains = Document.objects.create(
content="I contain the keyword.",
@@ -464,6 +467,9 @@ class TestDocumentConsumptionFinishedSignal(TestCase):
override_settings(INDEX_DIR=self.index_dir).enable()
def tearDown(self) -> None:
from documents.search import reset_backend
reset_backend()
shutil.rmtree(self.index_dir, ignore_errors=True)
def test_tag_applied_any(self) -> None:
+3 -1
View File
@@ -11,10 +11,12 @@ from documents.models import WorkflowAction
from documents.models import WorkflowTrigger
from documents.serialisers import TagSerializer
from documents.signals.handlers import run_workflows
from documents.tests.utils import DirectoriesMixin
class TestTagHierarchy(APITestCase):
class TestTagHierarchy(DirectoriesMixin, APITestCase):
def setUp(self) -> None:
super().setUp()
self.user = User.objects.create_superuser(username="admin")
self.client.force_authenticate(user=self.user)
+21 -9
View File
@@ -2,6 +2,7 @@ import uuid
from unittest import mock
import celery
from django.contrib.auth import get_user_model
from django.test import TestCase
from documents.data_models import ConsumableDocument
@@ -20,6 +21,11 @@ from documents.tests.utils import DirectoriesMixin
@mock.patch("documents.consumer.magic.from_file", fake_magic_from_file)
class TestTaskSignalHandler(DirectoriesMixin, TestCase):
@classmethod
def setUpTestData(cls) -> None:
super().setUpTestData()
cls.user = get_user_model().objects.create_user(username="testuser")
def util_call_before_task_publish_handler(
self,
headers_to_use,
@@ -57,7 +63,7 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
),
DocumentMetadataOverrides(
title="Hello world",
owner_id=1,
owner_id=self.user.id,
),
),
# kwargs
@@ -75,7 +81,7 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
self.assertEqual(headers["id"], task.task_id)
self.assertEqual("hello-999.pdf", task.task_file_name)
self.assertEqual(PaperlessTask.TaskName.CONSUME_FILE, task.task_name)
self.assertEqual(1, task.owner_id)
self.assertEqual(self.user.id, task.owner_id)
self.assertEqual(celery.states.PENDING, task.status)
def test_task_prerun_handler(self) -> None:
@@ -208,10 +214,12 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
mime_type="application/pdf",
)
with mock.patch("documents.index.add_or_update_document") as add:
with mock.patch("documents.search.get_backend") as mock_get_backend:
mock_backend = mock.MagicMock()
mock_get_backend.return_value = mock_backend
add_to_index(sender=None, document=root)
add.assert_called_once_with(root)
mock_backend.add_or_update.assert_called_once_with(root, effective_content="")
def test_add_to_index_reindexes_root_for_version_documents(self) -> None:
root = Document.objects.create(
@@ -226,13 +234,17 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
root_document=root,
)
with mock.patch("documents.index.add_or_update_document") as add:
with mock.patch("documents.search.get_backend") as mock_get_backend:
mock_backend = mock.MagicMock()
mock_get_backend.return_value = mock_backend
add_to_index(sender=None, document=version)
self.assertEqual(add.call_count, 2)
self.assertEqual(add.call_args_list[0].args[0].id, version.id)
self.assertEqual(add.call_args_list[1].args[0].id, root.id)
self.assertEqual(mock_backend.add_or_update.call_count, 1)
self.assertEqual(
add.call_args_list[1].kwargs,
mock_backend.add_or_update.call_args_list[0].args[0].id,
version.id,
)
self.assertEqual(
mock_backend.add_or_update.call_args_list[0].kwargs,
{"effective_content": version.content},
)
+3 -22
View File
@@ -23,29 +23,10 @@ from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
class TestIndexReindex(DirectoriesMixin, TestCase):
def test_index_reindex(self) -> None:
Document.objects.create(
title="test",
content="my document",
checksum="wow",
added=timezone.now(),
created=timezone.now(),
modified=timezone.now(),
)
tasks.index_reindex()
@pytest.mark.django_db
class TestIndexOptimize:
def test_index_optimize(self) -> None:
Document.objects.create(
title="test",
content="my document",
checksum="wow",
added=timezone.now(),
created=timezone.now(),
modified=timezone.now(),
)
"""Index optimization task must execute without error (Tantivy handles optimization automatically)."""
tasks.index_optimize()
+1
View File
@@ -4802,6 +4802,7 @@ class TestWebhookSecurity:
@pytest.mark.django_db
@pytest.mark.usefixtures("_search_index")
class TestDateWorkflowLocalization(
SampleDirMixin,
):
+6
View File
@@ -157,11 +157,17 @@ class DirectoriesMixin:
"""
def setUp(self) -> None:
from documents.search import reset_backend
reset_backend()
self.dirs = setup_directories()
super().setUp()
def tearDown(self) -> None:
from documents.search import reset_backend
super().tearDown()
reset_backend()
remove_dirs(self.dirs)