Compare commits

..

4 Commits

Author SHA1 Message Date
Trenton H
b4c26fb204 Breaking: Remove the positional arguments from the pre/post consume scripts. Environment has been preferred for quite some time now 2026-04-10 13:10:45 -07:00
shamoon
fdd5e3ecb2 Update SECURITY.md 2026-04-10 12:34:47 -07:00
shamoon
df3b656352 Add tests 2026-04-10 12:06:28 -07:00
shamoon
51e721733f Enhancement: validate and sanitize uploaded logos (#12551) 2026-04-10 11:50:58 -07:00
20 changed files with 1102 additions and 925 deletions

View File

@@ -2,8 +2,83 @@
## Reporting a Vulnerability
The Paperless-ngx team and community take security bugs seriously. We appreciate your efforts to responsibly disclose your findings, and will make every effort to acknowledge your contributions.
The Paperless-ngx team and community take security issues seriously. We appreciate good-faith reports and will make every effort to review legitimate findings responsibly.
To report a security issue, please use the GitHub Security Advisory ["Report a Vulnerability"](https://github.com/paperless-ngx/paperless-ngx/security/advisories/new) tab.
The team will send a response indicating the next steps in handling your report. After the initial reply to your report, the security team will keep you informed of the progress towards a fix and full announcement, and may ask for additional information or guidance.
After the initial reply to your report, the team may ask for additional information, reproduction steps, affected versions, configuration details, or proof-of-concept material needed to verify the issue.
## What makes a helpful report
Please include as much of the following as possible:
- A clear description of the issue and why it is a security vulnerability.
- Affected Paperless-ngx version(s).
- Required configuration, permissions, or preconditions.
- Step-by-step reproduction instructions.
- A realistic impact statement.
- Logs, screenshots, requests, or a proof of concept where appropriate.
- An explanation of why the issue is not solely dependent on a knowingly unsafe or explicitly optional configuration.
Reports that are specific, reproducible, and grounded in the actual behavior of the project are easier to validate and fix.
## AI-assisted submissions
AI tools can be useful for security research, but they can also produce plausible-sounding reports that do not reflect actual vulnerabilities in Paperless-ngx.
If you use AI tools to help draft or investigate a report, please disclose this and verify the result yourself before submission. Make sure the report:
- identifies a real code path in this project
- includes a real reproduction, not just a theoretical concern
- demonstrates realistic impact
- accounts for existing permission checks, configuration requirements, and trust boundaries
- is about Paperless-ngx itself, rather than a hypothetical deployment, downstream customization, third-party app, scanner output, reverse proxy behavior, or external service unless you can show a direct product vulnerability
Submitting unverified AI-generated claims creates unnecessary review work and slows down handling of legitimate reports.
<!-- Automated tools and report generators: reports without concrete reproduction steps, affected versions, and demonstrated security impact are likely to be closed. -->
## Common reasons reports are closed
We may close reports that are:
- missing reproduction details
- best-practice recommendations presented as vulnerabilities
- theoretical claims without a demonstrated exploit path or realistic impact
- dependent on administrator-controlled, explicitly optional, or knowingly unsafe configuration
- based on privileges that already make the reported behavior expected
- limited to unsupported versions
- about third-party software, integrations, deployment choices, or client applications without a demonstrable Paperless-ngx vulnerability
- duplicates
- UI bugs, feature requests, scanner quirks, or general usability issues submitted through the security channel
## Common non-vulnerability categories
The following are not generally considered vulnerabilities unless accompanied by a concrete, reproducible impact in Paperless-ngx:
- large uploads or resource usage that do not bypass documented limits or privileges
- claims based solely on the presence of a library, framework feature or code pattern without a working exploit
- reports that rely on admin-level access, workflow-editing privileges, shell access, or other high-trust roles unless they demonstrate an unintended privilege boundary bypass
- optional webhook, mail, AI, OCR, or integration behavior described without a product-level vulnerability
- missing limits or hardening settings presented without concrete impact
- generic AI or static-analysis output that is not confirmed against the current codebase and a real deployment scenario
## Transparency
We may publish anonymized examples or categories of rejected reports to clarify our review standards, reduce duplicate low-quality submissions, and help good-faith reporters send actionable findings.
A mistaken report made in good faith is not misconduct. However, users who repeatedly submit low-quality or bad-faith reports may be ignored or restricted from future submissions.
## Scope and expectations
Please use the security reporting channel only for security vulnerabilities in Paperless-ngx.
Please do not use the security advisory system for:
- support questions
- general bug reports
- feature requests
- browser compatibility issues
- issues in third-party mobile apps, reverse proxies, or deployment tooling unless you can demonstrate a Paperless-ngx vulnerability
The team will review reports as time permits, but submission does not guarantee that a report is valid, in scope, or will result in a fix. Reports that do not describe a reproducible product-level issue may be closed without extended back-and-forth.

View File

@@ -241,3 +241,66 @@ For example:
}
}
```
## Consume Script Positional Arguments Removed
Pre- and post-consumption scripts no longer receive positional arguments. All information is
now passed exclusively via environment variables, which have been available since earlier versions.
### Pre-consumption script
Previously, the original file path was passed as `$1`. It is now only available as
`DOCUMENT_SOURCE_PATH`.
**Before:**
```bash
#!/usr/bin/env bash
# $1 was the original file path
process_document "$1"
```
**After:**
```bash
#!/usr/bin/env bash
process_document "${DOCUMENT_SOURCE_PATH}"
```
### Post-consumption script
Previously, document metadata was passed as positional arguments `$1` through `$8`:
| Argument | Environment Variable Equivalent |
| -------- | ------------------------------- |
| `$1` | `DOCUMENT_ID` |
| `$2` | `DOCUMENT_FILE_NAME` |
| `$3` | `DOCUMENT_SOURCE_PATH` |
| `$4` | `DOCUMENT_THUMBNAIL_PATH` |
| `$5` | `DOCUMENT_DOWNLOAD_URL` |
| `$6` | `DOCUMENT_THUMBNAIL_URL` |
| `$7` | `DOCUMENT_CORRESPONDENT` |
| `$8` | `DOCUMENT_TAGS` |
**Before:**
```bash
#!/usr/bin/env bash
DOCUMENT_ID=$1
CORRESPONDENT=$7
TAGS=$8
```
**After:**
```bash
#!/usr/bin/env bash
# Use environment variables directly
echo "Document ${DOCUMENT_ID} from ${DOCUMENT_CORRESPONDENT} tagged: ${DOCUMENT_TAGS}"
```
### Action Required
Update any pre- or post-consumption scripts that read `$1`, `$2`, etc. to use the
corresponding environment variables instead. Environment variables have been the preferred
option since v1.8.0.

View File

@@ -43,7 +43,7 @@
</div>
<p class="card-text">
@if (document) {
@if (hasSearchHighlights) {
@if (document.__search_hit__ && document.__search_hit__.highlights) {
<span [innerHtml]="document.__search_hit__.highlights"></span>
}
@for (highlight of searchNoteHighlights; track highlight) {
@@ -52,7 +52,7 @@
<span [innerHtml]="highlight"></span>
</span>
}
@if (shouldShowContentFallback) {
@if (!document.__search_hit__?.score) {
<span class="result-content">{{contentTrimmed}}</span>
}
} @else {

View File

@@ -65,9 +65,7 @@
}
}
.card-text ::ng-deep .match,
.card-text ::ng-deep b {
font-weight: normal;
span ::ng-deep .match {
color: black;
background-color: rgb(255, 211, 66);
}

View File

@@ -127,19 +127,6 @@ describe('DocumentCardLargeComponent', () => {
expect(component.searchNoteHighlights).toContain('<span>bananas</span>')
})
it('should fall back to document content when a search hit has no highlights', () => {
component.document.__search_hit__ = {
score: 0.9,
rank: 1,
highlights: '',
note_highlights: null,
}
fixture.detectChanges()
expect(fixture.nativeElement.textContent).toContain('Cupcake ipsum')
expect(component.shouldShowContentFallback).toBe(true)
})
it('should try to close the preview on mouse leave', () => {
component.popupPreview = {
close: jest.fn(),

View File

@@ -164,17 +164,6 @@ export class DocumentCardLargeComponent
)
}
get hasSearchHighlights() {
return Boolean(this.document?.__search_hit__?.highlights?.trim()?.length)
}
get shouldShowContentFallback() {
return (
this.document?.__search_hit__?.score == null ||
(!this.hasSearchHighlights && this.searchNoteHighlights.length === 0)
)
}
get notesEnabled(): boolean {
return this.settingsService.get(SETTINGS_KEYS.NOTES_ENABLED)
}

View File

@@ -313,7 +313,6 @@ class ConsumerPlugin(
run_subprocess(
[
settings.PRE_CONSUME_SCRIPT,
original_file_path,
],
script_env,
self.log,
@@ -383,14 +382,6 @@ class ConsumerPlugin(
run_subprocess(
[
settings.POST_CONSUME_SCRIPT,
str(document.pk),
document.get_public_filename(),
os.path.normpath(document.source_path),
os.path.normpath(document.thumbnail_path),
reverse("document-download", kwargs={"pk": document.pk}),
reverse("document-thumb", kwargs={"pk": document.pk}),
str(document.correspondent),
str(",".join(document.tags.all().values_list("name", flat=True))),
],
script_env,
self.log,

View File

@@ -1,6 +1,6 @@
from documents.search._backend import SearchHit
from documents.search._backend import SearchIndexLockError
from documents.search._backend import SearchMode
from documents.search._backend import SearchResults
from documents.search._backend import TantivyBackend
from documents.search._backend import TantivyRelevanceList
from documents.search._backend import WriteBatch
@@ -10,9 +10,9 @@ from documents.search._schema import needs_rebuild
from documents.search._schema import wipe_index
__all__ = [
"SearchHit",
"SearchIndexLockError",
"SearchMode",
"SearchResults",
"TantivyBackend",
"TantivyRelevanceList",
"WriteBatch",

View File

@@ -1,9 +1,9 @@
from __future__ import annotations
import logging
import re
import threading
from collections import Counter
from dataclasses import dataclass
from datetime import UTC
from datetime import datetime
from enum import StrEnum
@@ -88,63 +88,45 @@ class SearchHit(TypedDict):
highlights: dict[str, str]
@dataclass(frozen=True, slots=True)
class SearchResults:
"""
Container for search results with pagination metadata.
Attributes:
hits: List of search results with scores and highlights
total: Total matching documents across all pages (for pagination)
query: Preprocessed query string after date/syntax rewriting
"""
hits: list[SearchHit]
total: int # total matching documents (for pagination)
query: str # preprocessed query string
class TantivyRelevanceList:
"""
DRF-compatible list wrapper for Tantivy search results.
DRF-compatible list wrapper for Tantivy search hits.
Holds a lightweight ordered list of IDs (for pagination count and
``selection_data``) together with a small page of rich ``SearchHit``
dicts (for serialization). DRF's ``PageNumberPagination`` calls
``__len__`` to compute the total page count and ``__getitem__`` to
slice the displayed page.
Provides paginated access to search results while storing all hits in memory
for efficient ID retrieval. Used by Django REST framework for pagination.
Args:
ordered_ids: All matching document IDs in display order.
page_hits: Rich SearchHit dicts for the requested DRF page only.
page_offset: Index into *ordered_ids* where *page_hits* starts.
Methods:
__len__: Returns total hit count for pagination calculations
__getitem__: Slices the hit list for page-specific results
Note: Stores ALL post-filter hits so get_all_result_ids() can return
every matching document ID without requiring a second search query.
"""
def __init__(
self,
ordered_ids: list[int],
page_hits: list[SearchHit],
page_offset: int = 0,
) -> None:
self._ordered_ids = ordered_ids
self._page_hits = page_hits
self._page_offset = page_offset
def __init__(self, hits: list[SearchHit]) -> None:
self._hits = hits
def __len__(self) -> int:
return len(self._ordered_ids)
return len(self._hits)
def __getitem__(self, key: int | slice) -> SearchHit | list[SearchHit]:
if isinstance(key, int):
idx = key if key >= 0 else len(self._ordered_ids) + key
if self._page_offset <= idx < self._page_offset + len(self._page_hits):
return self._page_hits[idx - self._page_offset]
return SearchHit(
id=self._ordered_ids[key],
score=0.0,
rank=idx + 1,
highlights={},
)
start = key.start or 0
stop = key.stop or len(self._ordered_ids)
# DRF slices to extract the current page. If the slice aligns
# with our pre-fetched page_hits, return them directly.
# We only check start — DRF always slices with stop=start+page_size,
# which exceeds page_hits length on the last page.
if start == self._page_offset:
return self._page_hits[: stop - start]
# Fallback: return stub dicts (no highlights).
return [
SearchHit(id=doc_id, score=0.0, rank=start + i + 1, highlights={})
for i, doc_id in enumerate(self._ordered_ids[key])
]
def get_all_ids(self) -> list[int]:
"""Return all matching document IDs in display order."""
return self._ordered_ids
def __getitem__(self, key: slice) -> list[SearchHit]:
return self._hits[key]
class SearchIndexLockError(Exception):
@@ -224,13 +206,10 @@ class WriteBatch:
"""
Remove a document from the batch by its primary key.
Uses range_query instead of term_query to work around a tantivy-py bug
where Python integers are inferred as i64, producing Terms that never
match u64 fields.
TODO: Replace with term_query("id", doc_id) once
https://github.com/quickwit-oss/tantivy-py/pull/642 lands.
Uses range query instead of term query to work around unsigned integer
type detection bug in tantivy-py 0.25.
"""
# Use range query to work around u64 deletion bug
self._writer.delete_documents_by_query(
tantivy.Query.range_query(
self._backend._schema,
@@ -255,34 +234,6 @@ class TantivyBackend:
the underlying index directory changes (e.g., during test isolation).
"""
# Maps DRF ordering field names to Tantivy index field names.
SORT_FIELD_MAP: dict[str, str] = {
"title": "title_sort",
"correspondent__name": "correspondent_sort",
"document_type__name": "type_sort",
"created": "created",
"added": "added",
"modified": "modified",
"archive_serial_number": "asn",
"page_count": "page_count",
"num_notes": "num_notes",
}
# Fields where Tantivy's sort order matches the ORM's sort order.
# Text-based fields (title, correspondent__name, document_type__name)
# are excluded because Tantivy's tokenized fast fields produce different
# ordering than the ORM's collation-based ordering.
SORTABLE_FIELDS: frozenset[str] = frozenset(
{
"created",
"added",
"modified",
"archive_serial_number",
"page_count",
"num_notes",
},
)
def __init__(self, path: Path | None = None):
# path=None → in-memory index (for tests)
# path=some_dir → on-disk index (for production)
@@ -321,36 +272,6 @@ class TantivyBackend:
if self._index is None:
self.open() # pragma: no cover
def _parse_query(
self,
query: str,
search_mode: SearchMode,
) -> tantivy.Query:
"""Parse a user query string into a Tantivy Query object."""
tz = get_current_timezone()
if search_mode is SearchMode.TEXT:
return parse_simple_text_query(self._index, query)
elif search_mode is SearchMode.TITLE:
return parse_simple_title_query(self._index, query)
else:
return parse_user_query(self._index, query, tz)
def _apply_permission_filter(
self,
query: tantivy.Query,
user: AbstractBaseUser | None,
) -> tantivy.Query:
"""Wrap a query with a permission filter if the user is not a superuser."""
if user is not None:
permission_filter = build_permission_filter(self._schema, user)
return tantivy.Query.boolean_query(
[
(tantivy.Occur.Must, query),
(tantivy.Occur.Must, permission_filter),
],
)
return query
def _build_tantivy_doc(
self,
document: Document,
@@ -405,17 +326,12 @@ class TantivyBackend:
doc.add_unsigned("tag_id", tag.pk)
tag_names.append(tag.name)
# Notes — JSON for structured queries (notes.user:alice, notes.note:text).
# notes_text is a plain-text companion for snippet/highlight generation;
# tantivy's SnippetGenerator does not support JSON fields.
# Notes — JSON for structured queries (notes.user:alice, notes.note:text),
# companion text field for default full-text search.
num_notes = 0
note_texts: list[str] = []
for note in document.notes.all():
num_notes += 1
doc.add_json("notes", {"note": note.note, "user": note.user.username})
note_texts.append(note.note)
if note_texts:
doc.add_text("notes_text", " ".join(note_texts))
# Custom fields — JSON for structured queries (custom_fields.name:x, custom_fields.value:y),
# companion text field for default full-text search.
@@ -509,125 +425,155 @@ class TantivyBackend:
with self.batch_update(lock_timeout=5.0) as batch:
batch.remove(doc_id)
def highlight_hits(
def search(
self,
query: str,
doc_ids: list[int],
user: AbstractBaseUser | None,
page: int,
page_size: int,
sort_field: str | None,
*,
sort_reverse: bool,
search_mode: SearchMode = SearchMode.QUERY,
rank_start: int = 1,
) -> list[SearchHit]:
) -> SearchResults:
"""
Generate SearchHit dicts with highlights for specific document IDs.
Execute a search query against the document index.
Unlike search(), this does not execute a ranked query — it looks up
each document by ID and generates snippets against the provided query.
Use this when you already know which documents to display (from
search_ids + ORM filtering) and just need highlight data.
Processes the user query through date rewriting, normalization, and
permission filtering before executing against Tantivy. Supports both
relevance-based and field-based sorting.
Note: Each doc_id requires an individual index lookup because tantivy-py
does not yet expose a batch fast-field read API. This is acceptable for
page-sized batches (typically 25 docs) but should not be called with
thousands of IDs.
TODO: When https://github.com/quickwit-oss/tantivy-py/pull/641 lands,
the per-doc range_query lookups here can be replaced with a single
collect_u64_fast_field("id", doc_addresses) call.
QUERY search mode supports natural date keywords, field filters, etc.
TITLE search mode treats the query as plain text to search for in title only
TEXT search mode treats the query as plain text to search for in title and content
Args:
query: The search query (used for snippet generation)
doc_ids: Ordered list of document IDs to generate hits for
search_mode: Query parsing mode (for building the snippet query)
rank_start: Starting rank value (1-based absolute position in the
full result set; pass ``page_offset + 1`` for paginated calls)
query: User's search query
user: User for permission filtering (None for superuser/no filtering)
page: Page number (1-indexed) for pagination
page_size: Number of results per page
sort_field: Field to sort by (None for relevance ranking)
sort_reverse: Whether to reverse the sort order
search_mode: "query" for advanced Tantivy syntax, "text" for
plain-text search over title and content only, "title" for
plain-text search over title only
Returns:
List of SearchHit dicts in the same order as doc_ids
SearchResults with hits, total count, and processed query
"""
if not doc_ids:
return []
self._ensure_open()
user_query = self._parse_query(query, search_mode)
tz = get_current_timezone()
if search_mode is SearchMode.TEXT:
user_query = parse_simple_text_query(self._index, query)
elif search_mode is SearchMode.TITLE:
user_query = parse_simple_title_query(self._index, query)
else:
user_query = parse_user_query(self._index, query, tz)
# For notes_text snippet generation, we need a query that targets the
# notes_text field directly. user_query may contain JSON-field terms
# (e.g. notes.note:urgent) that the SnippetGenerator cannot resolve
# against a text field. Strip field:value prefixes so bare terms like
# "urgent" are re-parsed against notes_text, producing highlights even
# when the original query used structured syntax.
bare_query = re.sub(r"\w[\w.]*:", "", query).strip()
try:
notes_text_query = (
self._index.parse_query(bare_query, ["notes_text"])
if bare_query
else user_query
)
except Exception:
notes_text_query = user_query
searcher = self._index.searcher()
snippet_generator = None
notes_snippet_generator = None
hits: list[SearchHit] = []
for rank, doc_id in enumerate(doc_ids, start=rank_start):
# Look up document by ID, scoring against the user query so that
# the returned SearchHit carries a real BM25 relevance score.
id_query = tantivy.Query.range_query(
self._schema,
"id",
tantivy.FieldType.Unsigned,
doc_id,
doc_id,
)
scored_query = tantivy.Query.boolean_query(
# Apply permission filter if user is not None (not superuser)
if user is not None:
permission_filter = build_permission_filter(self._schema, user)
final_query = tantivy.Query.boolean_query(
[
(tantivy.Occur.Must, user_query),
(tantivy.Occur.Must, id_query),
(tantivy.Occur.Must, permission_filter),
],
)
results = searcher.search(scored_query, limit=1)
else:
final_query = user_query
if not results.hits:
continue
searcher = self._index.searcher()
offset = (page - 1) * page_size
score, doc_address = results.hits[0]
# Map sort fields
sort_field_map = {
"title": "title_sort",
"correspondent__name": "correspondent_sort",
"document_type__name": "type_sort",
"created": "created",
"added": "added",
"modified": "modified",
"archive_serial_number": "asn",
"page_count": "page_count",
"num_notes": "num_notes",
}
# Perform search
if sort_field and sort_field in sort_field_map:
mapped_field = sort_field_map[sort_field]
results = searcher.search(
final_query,
limit=offset + page_size,
order_by_field=mapped_field,
order=tantivy.Order.Desc if sort_reverse else tantivy.Order.Asc,
)
# Field sorting: hits are still (score, DocAddress) tuples; score unused
all_hits = [(hit[1], 0.0) for hit in results.hits]
else:
# Score-based search: hits are (score, DocAddress) tuples
results = searcher.search(final_query, limit=offset + page_size)
all_hits = [(hit[1], hit[0]) for hit in results.hits]
total = results.count
# Normalize scores for score-based searches
if not sort_field and all_hits:
max_score = max(hit[1] for hit in all_hits) or 1.0
all_hits = [(hit[0], hit[1] / max_score) for hit in all_hits]
# Apply threshold filter if configured (score-based search only)
threshold = settings.ADVANCED_FUZZY_SEARCH_THRESHOLD
if threshold is not None and not sort_field:
all_hits = [hit for hit in all_hits if hit[1] >= threshold]
# Get the page's hits
page_hits = all_hits[offset : offset + page_size]
# Build result hits with highlights
hits: list[SearchHit] = []
snippet_generator = None
notes_snippet_generator = None
for rank, (doc_address, score) in enumerate(page_hits, start=offset + 1):
# Get the actual document from the searcher using the doc address
actual_doc = searcher.doc(doc_address)
doc_dict = actual_doc.to_dict()
doc_id = doc_dict["id"][0]
highlights: dict[str, str] = {}
try:
if snippet_generator is None:
snippet_generator = tantivy.SnippetGenerator.create(
searcher,
user_query,
self._schema,
"content",
)
content_html = snippet_generator.snippet_from_doc(actual_doc).to_html()
if content_html:
highlights["content"] = content_html
if "notes_text" in doc_dict:
# Use notes_text (plain text) for snippet generation — tantivy's
# SnippetGenerator does not support JSON fields.
if notes_snippet_generator is None:
notes_snippet_generator = tantivy.SnippetGenerator.create(
# Generate highlights if score > 0
if score > 0:
try:
if snippet_generator is None:
snippet_generator = tantivy.SnippetGenerator.create(
searcher,
notes_text_query,
final_query,
self._schema,
"notes_text",
"content",
)
notes_html = notes_snippet_generator.snippet_from_doc(
actual_doc,
).to_html()
if notes_html:
highlights["notes"] = notes_html
except Exception: # pragma: no cover
logger.debug("Failed to generate highlights for doc %s", doc_id)
content_snippet = snippet_generator.snippet_from_doc(actual_doc)
if content_snippet:
highlights["content"] = str(content_snippet)
# Try notes highlights
if "notes" in doc_dict:
if notes_snippet_generator is None:
notes_snippet_generator = tantivy.SnippetGenerator.create(
searcher,
final_query,
self._schema,
"notes",
)
notes_snippet = notes_snippet_generator.snippet_from_doc(
actual_doc,
)
if notes_snippet:
highlights["notes"] = str(notes_snippet)
except Exception: # pragma: no cover
logger.debug("Failed to generate highlights for doc %s", doc_id)
hits.append(
SearchHit(
@@ -638,69 +584,11 @@ class TantivyBackend:
),
)
return hits
def search_ids(
self,
query: str,
user: AbstractBaseUser | None,
*,
sort_field: str | None = None,
sort_reverse: bool = False,
search_mode: SearchMode = SearchMode.QUERY,
limit: int | None = None,
) -> list[int]:
"""
Return document IDs matching a query — no highlights or scores.
This is the lightweight companion to search(). Use it when you need the
full set of matching IDs (e.g. for ``selection_data``) but don't need
scores, ranks, or highlights.
Args:
query: User's search query
user: User for permission filtering (None for superuser/no filtering)
sort_field: Field to sort by (None for relevance ranking)
sort_reverse: Whether to reverse the sort order
search_mode: Query parsing mode (QUERY, TEXT, or TITLE)
limit: Maximum number of IDs to return (None = all matching docs)
Returns:
List of document IDs in the requested order
"""
self._ensure_open()
user_query = self._parse_query(query, search_mode)
final_query = self._apply_permission_filter(user_query, user)
searcher = self._index.searcher()
effective_limit = limit if limit is not None else searcher.num_docs
if sort_field and sort_field in self.SORT_FIELD_MAP:
mapped_field = self.SORT_FIELD_MAP[sort_field]
results = searcher.search(
final_query,
limit=effective_limit,
order_by_field=mapped_field,
order=tantivy.Order.Desc if sort_reverse else tantivy.Order.Asc,
)
all_hits = [(hit[1],) for hit in results.hits]
else:
results = searcher.search(final_query, limit=effective_limit)
all_hits = [(hit[1], hit[0]) for hit in results.hits]
# Normalize scores and apply threshold (relevance search only)
if all_hits:
max_score = max(hit[1] for hit in all_hits) or 1.0
all_hits = [(hit[0], hit[1] / max_score) for hit in all_hits]
threshold = settings.ADVANCED_FUZZY_SEARCH_THRESHOLD
if threshold is not None:
all_hits = [hit for hit in all_hits if hit[1] >= threshold]
# TODO: Replace with searcher.collect_u64_fast_field("id", addrs) once
# https://github.com/quickwit-oss/tantivy-py/pull/641 lands — eliminates
# one stored-doc fetch per result (~80% reduction in search_ids latency).
return [searcher.doc(doc_addr).to_dict()["id"][0] for doc_addr, *_ in all_hits]
return SearchResults(
hits=hits,
total=total,
query=query,
)
def autocomplete(
self,
@@ -715,10 +603,6 @@ class TantivyBackend:
frequency (how many documents contain each word). Optionally filters
results to only words from documents visible to the specified user.
NOTE: This is the hottest search path (called per keystroke).
A future improvement would be to cache results in Redis, keyed by
(prefix, user_id), and invalidate on index writes.
Args:
term: Prefix to match against autocomplete words
limit: Maximum number of suggestions to return
@@ -729,94 +613,64 @@ class TantivyBackend:
"""
self._ensure_open()
normalized_term = ascii_fold(term.lower())
if not normalized_term:
return []
searcher = self._index.searcher()
# Build a prefix query on autocomplete_word so we only scan docs
# containing words that start with the prefix, not the entire index.
# tantivy regex is implicitly anchored; .+ avoids the empty-match
# error that .* triggers. We OR with term_query to also match the
# exact prefix as a complete word.
escaped = re.escape(normalized_term)
prefix_query = tantivy.Query.boolean_query(
[
(
tantivy.Occur.Should,
tantivy.Query.term_query(
self._schema,
"autocomplete_word",
normalized_term,
),
),
(
tantivy.Occur.Should,
tantivy.Query.regex_query(
self._schema,
"autocomplete_word",
f"{escaped}.+",
),
),
],
)
# Intersect with permission filter so autocomplete words from
# invisible documents don't leak to other users.
# Apply permission filter for non-superusers so autocomplete words
# from invisible documents don't leak to other users.
if user is not None and not user.is_superuser:
final_query = tantivy.Query.boolean_query(
[
(tantivy.Occur.Must, prefix_query),
(tantivy.Occur.Must, build_permission_filter(self._schema, user)),
],
)
base_query = build_permission_filter(self._schema, user)
else:
final_query = prefix_query
base_query = tantivy.Query.all_query()
results = searcher.search(final_query, limit=searcher.num_docs)
results = searcher.search(base_query, limit=10000)
# Count how many visible documents each matching word appears in.
# Count how many visible documents each word appears in.
# Using Counter (not set) preserves per-word document frequency so
# we can rank suggestions by how commonly they occur — the same
# signal Whoosh used for Tf/Idf-based autocomplete ordering.
word_counts: Counter[str] = Counter()
for _score, doc_address in results.hits:
stored_doc = searcher.doc(doc_address)
doc_dict = stored_doc.to_dict()
if "autocomplete_word" in doc_dict:
for word in doc_dict["autocomplete_word"]:
if word.startswith(normalized_term):
word_counts[word] += 1
word_counts.update(doc_dict["autocomplete_word"])
# Sort by document frequency descending; break ties alphabetically.
# Filter to prefix matches, sort by document frequency descending;
# break ties alphabetically for stable, deterministic output.
matches = sorted(
word_counts,
(w for w in word_counts if w.startswith(normalized_term)),
key=lambda w: (-word_counts[w], w),
)
return matches[:limit]
def more_like_this_ids(
def more_like_this(
self,
doc_id: int,
user: AbstractBaseUser | None,
*,
limit: int | None = None,
) -> list[int]:
page: int,
page_size: int,
) -> SearchResults:
"""
Return IDs of documents similar to the given document — no highlights.
Find documents similar to the given document using content analysis.
Lightweight companion to more_like_this(). The original document is
excluded from results.
Uses Tantivy's "more like this" query to find documents with similar
content patterns. The original document is excluded from results.
Args:
doc_id: Primary key of the reference document
user: User for permission filtering (None for no filtering)
limit: Maximum number of IDs to return (None = all matching docs)
page: Page number (1-indexed) for pagination
page_size: Number of results per page
Returns:
List of similar document IDs (excluding the original)
SearchResults with similar documents (excluding the original)
"""
self._ensure_open()
searcher = self._index.searcher()
# First find the document address
id_query = tantivy.Query.range_query(
self._schema,
"id",
@@ -827,9 +681,13 @@ class TantivyBackend:
results = searcher.search(id_query, limit=1)
if not results.hits:
return []
# Document not found
return SearchResults(hits=[], total=0, query=f"more_like:{doc_id}")
# Extract doc_address from (score, doc_address) tuple
doc_address = results.hits[0][1]
# Build more like this query
mlt_query = tantivy.Query.more_like_this_query(
doc_address,
min_doc_frequency=1,
@@ -841,21 +699,59 @@ class TantivyBackend:
boost_factor=None,
)
final_query = self._apply_permission_filter(mlt_query, user)
# Apply permission filter
if user is not None:
permission_filter = build_permission_filter(self._schema, user)
final_query = tantivy.Query.boolean_query(
[
(tantivy.Occur.Must, mlt_query),
(tantivy.Occur.Must, permission_filter),
],
)
else:
final_query = mlt_query
effective_limit = limit if limit is not None else searcher.num_docs
# Fetch one extra to account for excluding the original document
results = searcher.search(final_query, limit=effective_limit + 1)
# Search
offset = (page - 1) * page_size
results = searcher.search(final_query, limit=offset + page_size)
# TODO: Replace with collect_u64_fast_field("id", addrs) once
# https://github.com/quickwit-oss/tantivy-py/pull/641 lands.
ids = []
for _score, doc_address in results.hits:
result_doc_id = searcher.doc(doc_address).to_dict()["id"][0]
if result_doc_id != doc_id:
ids.append(result_doc_id)
total = results.count
# Convert from (score, doc_address) to (doc_address, score)
all_hits = [(hit[1], hit[0]) for hit in results.hits]
return ids[:limit] if limit is not None else ids
# Normalize scores
if all_hits:
max_score = max(hit[1] for hit in all_hits) or 1.0
all_hits = [(hit[0], hit[1] / max_score) for hit in all_hits]
# Get page hits
page_hits = all_hits[offset : offset + page_size]
# Build results
hits: list[SearchHit] = []
for rank, (doc_address, score) in enumerate(page_hits, start=offset + 1):
actual_doc = searcher.doc(doc_address)
doc_dict = actual_doc.to_dict()
result_doc_id = doc_dict["id"][0]
# Skip the original document
if result_doc_id == doc_id:
continue
hits.append(
SearchHit(
id=result_doc_id,
score=score,
rank=rank,
highlights={}, # MLT doesn't generate highlights
),
)
return SearchResults(
hits=hits,
total=max(0, total - 1), # Subtract 1 for the original document
query=f"more_like:{doc_id}",
)
def batch_update(self, lock_timeout: float = 30.0) -> WriteBatch:
"""

View File

@@ -396,17 +396,10 @@ def build_permission_filter(
Tantivy query that filters results to visible documents
Implementation Notes:
- Uses range_query instead of term_query for owner_id/viewer_id to work
around a tantivy-py bug where Python ints are inferred as i64, causing
term_query to return no hits on u64 fields.
TODO: Replace with term_query once
https://github.com/quickwit-oss/tantivy-py/pull/642 lands.
- Uses range_query(owner_id, 1, MAX_U64) as an "owner exists" check
because exists_query is not yet available in tantivy-py 0.25.
TODO: Replace with exists_query("owner_id") once that is exposed in
a tantivy-py release.
- Uses range_query instead of term_query to work around unsigned integer
type detection bug in tantivy-py 0.25
- Uses boolean_query for "no owner" check since exists_query is not
available in tantivy-py 0.25.1 (available in master)
- Uses disjunction_max_query to combine permission clauses with OR logic
"""
owner_any = tantivy.Query.range_query(

View File

@@ -72,9 +72,6 @@ def build_schema() -> tantivy.Schema:
# JSON fields — structured queries: notes.user:alice, custom_fields.name:invoice
sb.add_json_field("notes", stored=True, tokenizer_name="paperless_text")
# Plain-text companion for notes — tantivy's SnippetGenerator does not support
# JSON fields, so highlights require a text field with the same content.
sb.add_text_field("notes_text", stored=True, tokenizer_name="paperless_text")
sb.add_json_field("custom_fields", stored=True, tokenizer_name="paperless_text")
for field in (

View File

@@ -33,12 +33,19 @@ class TestWriteBatch:
except RuntimeError:
pass
ids = backend.search_ids("should survive", user=None)
assert len(ids) == 1
r = backend.search(
"should survive",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert r.total == 1
class TestSearch:
"""Test search query parsing and matching via search_ids."""
"""Test search functionality."""
def test_text_mode_limits_default_search_to_title_and_content(
self,
@@ -53,20 +60,27 @@ class TestSearch:
)
backend.add_or_update(doc)
assert (
len(
backend.search_ids(
"document_type:invoice",
user=None,
search_mode=SearchMode.TEXT,
),
)
== 0
metadata_only = backend.search(
"document_type:invoice",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
)
assert (
len(backend.search_ids("monthly", user=None, search_mode=SearchMode.TEXT))
== 1
assert metadata_only.total == 0
content_match = backend.search(
"monthly",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
)
assert content_match.total == 1
def test_title_mode_limits_default_search_to_title_only(
self,
@@ -81,14 +95,27 @@ class TestSearch:
)
backend.add_or_update(doc)
assert (
len(backend.search_ids("monthly", user=None, search_mode=SearchMode.TITLE))
== 0
content_only = backend.search(
"monthly",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TITLE,
)
assert (
len(backend.search_ids("invoice", user=None, search_mode=SearchMode.TITLE))
== 1
assert content_only.total == 0
title_match = backend.search(
"invoice",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TITLE,
)
assert title_match.total == 1
def test_text_mode_matches_partial_term_substrings(
self,
@@ -103,16 +130,38 @@ class TestSearch:
)
backend.add_or_update(doc)
assert (
len(backend.search_ids("pass", user=None, search_mode=SearchMode.TEXT)) == 1
prefix_match = backend.search(
"pass",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
)
assert (
len(backend.search_ids("sswo", user=None, search_mode=SearchMode.TEXT)) == 1
assert prefix_match.total == 1
infix_match = backend.search(
"sswo",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
)
assert (
len(backend.search_ids("sswo re", user=None, search_mode=SearchMode.TEXT))
== 1
assert infix_match.total == 1
phrase_match = backend.search(
"sswo re",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
)
assert phrase_match.total == 1
def test_text_mode_does_not_match_on_partial_term_overlap(
self,
@@ -127,10 +176,16 @@ class TestSearch:
)
backend.add_or_update(doc)
assert (
len(backend.search_ids("raptor", user=None, search_mode=SearchMode.TEXT))
== 0
non_match = backend.search(
"raptor",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
)
assert non_match.total == 0
def test_text_mode_anchors_later_query_tokens_to_token_starts(
self,
@@ -159,9 +214,16 @@ class TestSearch:
backend.add_or_update(prefix_doc)
backend.add_or_update(false_positive)
result_ids = set(
backend.search_ids("Z-Berichte 6", user=None, search_mode=SearchMode.TEXT),
results = backend.search(
"Z-Berichte 6",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
)
result_ids = {hit["id"] for hit in results.hits}
assert exact_doc.id in result_ids
assert prefix_doc.id in result_ids
@@ -180,9 +242,16 @@ class TestSearch:
)
backend.add_or_update(doc)
assert (
len(backend.search_ids("!!!", user=None, search_mode=SearchMode.TEXT)) == 0
no_tokens = backend.search(
"!!!",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
)
assert no_tokens.total == 0
def test_title_mode_matches_partial_term_substrings(
self,
@@ -197,18 +266,59 @@ class TestSearch:
)
backend.add_or_update(doc)
assert (
len(backend.search_ids("pass", user=None, search_mode=SearchMode.TITLE))
== 1
prefix_match = backend.search(
"pass",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TITLE,
)
assert (
len(backend.search_ids("sswo", user=None, search_mode=SearchMode.TITLE))
== 1
assert prefix_match.total == 1
infix_match = backend.search(
"sswo",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TITLE,
)
assert (
len(backend.search_ids("sswo gu", user=None, search_mode=SearchMode.TITLE))
== 1
assert infix_match.total == 1
phrase_match = backend.search(
"sswo gu",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TITLE,
)
assert phrase_match.total == 1
def test_scores_normalised_top_hit_is_one(self, backend: TantivyBackend):
"""Search scores must be normalized so top hit has score 1.0 for UI consistency."""
for i, title in enumerate(["bank invoice", "bank statement", "bank receipt"]):
doc = Document.objects.create(
title=title,
content=title,
checksum=f"SN{i}",
pk=10 + i,
)
backend.add_or_update(doc)
r = backend.search(
"bank",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert r.hits[0]["score"] == pytest.approx(1.0)
assert all(0.0 <= h["score"] <= 1.0 for h in r.hits)
def test_sort_field_ascending(self, backend: TantivyBackend):
"""Searching with sort_reverse=False must return results in ascending ASN order."""
@@ -221,14 +331,16 @@ class TestSearch:
)
backend.add_or_update(doc)
ids = backend.search_ids(
r = backend.search(
"sortable",
user=None,
page=1,
page_size=10,
sort_field="archive_serial_number",
sort_reverse=False,
)
assert len(ids) == 3
asns = [Document.objects.get(pk=doc_id).archive_serial_number for doc_id in ids]
assert r.total == 3
asns = [Document.objects.get(pk=h["id"]).archive_serial_number for h in r.hits]
assert asns == [10, 20, 30]
def test_sort_field_descending(self, backend: TantivyBackend):
@@ -242,91 +354,79 @@ class TestSearch:
)
backend.add_or_update(doc)
ids = backend.search_ids(
r = backend.search(
"sortable",
user=None,
page=1,
page_size=10,
sort_field="archive_serial_number",
sort_reverse=True,
)
assert len(ids) == 3
asns = [Document.objects.get(pk=doc_id).archive_serial_number for doc_id in ids]
assert r.total == 3
asns = [Document.objects.get(pk=h["id"]).archive_serial_number for h in r.hits]
assert asns == [30, 20, 10]
class TestSearchIds:
"""Test lightweight ID-only search."""
def test_returns_matching_ids(self, backend: TantivyBackend):
"""search_ids must return IDs of all matching documents."""
docs = []
for i in range(5):
doc = Document.objects.create(
title=f"findable doc {i}",
content="common keyword",
checksum=f"SI{i}",
)
backend.add_or_update(doc)
docs.append(doc)
other = Document.objects.create(
title="unrelated",
content="nothing here",
checksum="SI_other",
)
backend.add_or_update(other)
ids = backend.search_ids(
"common keyword",
user=None,
search_mode=SearchMode.QUERY,
)
assert set(ids) == {d.pk for d in docs}
assert other.pk not in ids
def test_respects_permission_filter(self, backend: TantivyBackend):
"""search_ids must respect user permission filtering."""
owner = User.objects.create_user("ids_owner")
other = User.objects.create_user("ids_other")
def test_fuzzy_threshold_filters_low_score_hits(
self,
backend: TantivyBackend,
settings,
):
"""When ADVANCED_FUZZY_SEARCH_THRESHOLD exceeds all normalized scores, hits must be filtered out."""
doc = Document.objects.create(
title="private doc",
content="secret keyword",
checksum="SIP1",
title="Invoice document",
content="financial report",
checksum="FT1",
pk=120,
)
backend.add_or_update(doc)
# Threshold above 1.0 filters every hit (normalized scores top out at 1.0)
settings.ADVANCED_FUZZY_SEARCH_THRESHOLD = 1.1
r = backend.search(
"invoice",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert r.hits == []
def test_owner_filter(self, backend: TantivyBackend):
"""Document owners can search their private documents; other users cannot access them."""
owner = User.objects.create_user("owner")
other = User.objects.create_user("other")
doc = Document.objects.create(
title="Private",
content="secret",
checksum="PF1",
pk=20,
owner=owner,
)
backend.add_or_update(doc)
assert backend.search_ids(
"secret",
user=owner,
search_mode=SearchMode.QUERY,
) == [doc.pk]
assert (
backend.search_ids("secret", user=other, search_mode=SearchMode.QUERY) == []
backend.search(
"secret",
user=owner,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
).total
== 1
)
def test_respects_fuzzy_threshold(self, backend: TantivyBackend, settings):
"""search_ids must apply the same fuzzy threshold as search()."""
doc = Document.objects.create(
title="threshold test",
content="unique term",
checksum="SIT1",
assert (
backend.search(
"secret",
user=other,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
).total
== 0
)
backend.add_or_update(doc)
settings.ADVANCED_FUZZY_SEARCH_THRESHOLD = 1.1
ids = backend.search_ids("unique", user=None, search_mode=SearchMode.QUERY)
assert ids == []
def test_returns_ids_for_text_mode(self, backend: TantivyBackend):
"""search_ids must work with TEXT search mode."""
doc = Document.objects.create(
title="text mode doc",
content="findable phrase",
checksum="SIM1",
)
backend.add_or_update(doc)
ids = backend.search_ids("findable", user=None, search_mode=SearchMode.TEXT)
assert ids == [doc.pk]
class TestRebuild:
@@ -390,26 +490,57 @@ class TestAutocomplete:
class TestMoreLikeThis:
"""Test more like this functionality."""
def test_more_like_this_ids_excludes_original(self, backend: TantivyBackend):
"""more_like_this_ids must return IDs of similar documents, excluding the original."""
def test_excludes_original(self, backend: TantivyBackend):
"""More like this queries must exclude the reference document from results."""
doc1 = Document.objects.create(
title="Important document",
content="financial information report",
checksum="MLTI1",
pk=150,
content="financial information",
checksum="MLT1",
pk=50,
)
doc2 = Document.objects.create(
title="Another document",
content="financial information report",
checksum="MLTI2",
pk=151,
content="financial report",
checksum="MLT2",
pk=51,
)
backend.add_or_update(doc1)
backend.add_or_update(doc2)
ids = backend.more_like_this_ids(doc_id=150, user=None)
assert 150 not in ids
assert 151 in ids
results = backend.more_like_this(doc_id=50, user=None, page=1, page_size=10)
returned_ids = [hit["id"] for hit in results.hits]
assert 50 not in returned_ids # Original document excluded
def test_with_user_applies_permission_filter(self, backend: TantivyBackend):
"""more_like_this with a user must exclude documents that user cannot see."""
viewer = User.objects.create_user("mlt_viewer")
other = User.objects.create_user("mlt_other")
public_doc = Document.objects.create(
title="Public financial document",
content="quarterly financial analysis report figures",
checksum="MLT3",
pk=52,
)
private_doc = Document.objects.create(
title="Private financial document",
content="quarterly financial analysis report figures",
checksum="MLT4",
pk=53,
owner=other,
)
backend.add_or_update(public_doc)
backend.add_or_update(private_doc)
results = backend.more_like_this(doc_id=52, user=viewer, page=1, page_size=10)
returned_ids = [hit["id"] for hit in results.hits]
# private_doc is owned by other, so viewer cannot see it
assert 53 not in returned_ids
def test_document_not_in_index_returns_empty(self, backend: TantivyBackend):
"""more_like_this for a doc_id absent from the index must return empty results."""
results = backend.more_like_this(doc_id=9999, user=None, page=1, page_size=10)
assert results.hits == []
assert results.total == 0
class TestSingleton:
@@ -462,10 +593,19 @@ class TestFieldHandling:
# Should not raise an exception
backend.add_or_update(doc)
assert len(backend.search_ids("test", user=None)) == 1
results = backend.search(
"test",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1
def test_custom_fields_include_name_and_value(self, backend: TantivyBackend):
"""Custom fields must be indexed with both field name and value for structured queries."""
# Create a custom field
field = CustomField.objects.create(
name="Invoice Number",
data_type=CustomField.FieldDataType.STRING,
@@ -482,9 +622,18 @@ class TestFieldHandling:
value_text="INV-2024-001",
)
# Should not raise an exception during indexing
backend.add_or_update(doc)
assert len(backend.search_ids("invoice", user=None)) == 1
results = backend.search(
"invoice",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1
def test_select_custom_field_indexes_label_not_id(self, backend: TantivyBackend):
"""SELECT custom fields must index the human-readable label, not the opaque option ID."""
@@ -511,8 +660,27 @@ class TestFieldHandling:
)
backend.add_or_update(doc)
assert len(backend.search_ids("custom_fields.value:invoice", user=None)) == 1
assert len(backend.search_ids("custom_fields.value:opt_abc", user=None)) == 0
# Label should be findable
results = backend.search(
"custom_fields.value:invoice",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1
# Opaque ID must not appear in the index
results = backend.search(
"custom_fields.value:opt_abc",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 0
def test_none_custom_field_value_not_indexed(self, backend: TantivyBackend):
"""Custom field instances with no value set must not produce an index entry."""
@@ -534,7 +702,16 @@ class TestFieldHandling:
)
backend.add_or_update(doc)
assert len(backend.search_ids("custom_fields.value:none", user=None)) == 0
# The string "none" must not appear as an indexed value
results = backend.search(
"custom_fields.value:none",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 0
def test_notes_include_user_information(self, backend: TantivyBackend):
"""Notes must be indexed with user information when available for structured queries."""
@@ -547,96 +724,32 @@ class TestFieldHandling:
)
Note.objects.create(document=doc, note="Important note", user=user)
# Should not raise an exception during indexing
backend.add_or_update(doc)
ids = backend.search_ids("test", user=None)
assert len(ids) == 1, (
f"Expected 1, got {len(ids)}. Document content should be searchable."
# Test basic document search first
results = backend.search(
"test",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
assert results.total == 1, (
f"Expected 1, got {results.total}. Document content should be searchable."
)
ids = backend.search_ids("notes.note:important", user=None)
assert len(ids) == 1, (
f"Expected 1, got {len(ids)}. Note content should be searchable via notes.note: prefix."
# Test notes search — must use structured JSON syntax now that note
# is no longer in DEFAULT_SEARCH_FIELDS
results = backend.search(
"notes.note:important",
user=None,
page=1,
page_size=10,
sort_field=None,
sort_reverse=False,
)
class TestHighlightHits:
"""Test highlight_hits returns proper HTML strings, not raw Snippet objects."""
def test_highlights_content_returns_html_string(self, backend: TantivyBackend):
"""highlight_hits must return HTML strings (from Snippet.to_html()), not Snippet objects."""
doc = Document.objects.create(
title="Highlight Test",
content="The quick brown fox jumps over the lazy dog",
checksum="HH1",
pk=90,
assert results.total == 1, (
f"Expected 1, got {results.total}. Note content should be searchable via notes.note: prefix."
)
backend.add_or_update(doc)
hits = backend.highlight_hits("quick", [doc.pk])
assert len(hits) == 1
highlights = hits[0]["highlights"]
assert "content" in highlights
content_highlight = highlights["content"]
assert isinstance(content_highlight, str), (
f"Expected str, got {type(content_highlight)}: {content_highlight!r}"
)
# Tantivy wraps matched terms in <b> tags
assert "<b>" in content_highlight, (
f"Expected HTML with <b> tags, got: {content_highlight!r}"
)
def test_highlights_notes_returns_html_string(self, backend: TantivyBackend):
"""Note highlights must be HTML strings via notes_text companion field.
The notes JSON field does not support tantivy SnippetGenerator; the
notes_text plain-text field is used instead. We use the full-text
query "urgent" (not notes.note:) because notes_text IS in
DEFAULT_SEARCH_FIELDS via the normal search path… actually, we use
notes.note: prefix so the query targets notes content directly, but
the snippet is generated from notes_text which stores the same text.
"""
user = User.objects.create_user("hl_noteuser")
doc = Document.objects.create(
title="Doc with matching note",
content="unrelated content",
checksum="HH2",
pk=91,
)
Note.objects.create(document=doc, note="urgent payment required", user=user)
backend.add_or_update(doc)
# Use notes.note: prefix so the document matches the query and the
# notes_text snippet generator can produce highlights.
hits = backend.highlight_hits("notes.note:urgent", [doc.pk])
assert len(hits) == 1
highlights = hits[0]["highlights"]
assert "notes" in highlights
note_highlight = highlights["notes"]
assert isinstance(note_highlight, str), (
f"Expected str, got {type(note_highlight)}: {note_highlight!r}"
)
assert "<b>" in note_highlight, (
f"Expected HTML with <b> tags, got: {note_highlight!r}"
)
def test_empty_doc_list_returns_empty_hits(self, backend: TantivyBackend):
"""highlight_hits with no doc IDs must return an empty list."""
hits = backend.highlight_hits("anything", [])
assert hits == []
def test_no_highlights_when_no_match(self, backend: TantivyBackend):
"""Documents not matching the query should not appear in results."""
doc = Document.objects.create(
title="Unrelated",
content="completely different text",
checksum="HH3",
pk=92,
)
backend.add_or_update(doc)
hits = backend.highlight_hits("quick", [doc.pk])
assert len(hits) == 0

View File

@@ -6,6 +6,8 @@ from unittest.mock import patch
from django.contrib.auth.models import User
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import override_settings
from PIL import Image
from PIL.PngImagePlugin import PngInfo
from rest_framework import status
from rest_framework.test import APITestCase
@@ -201,6 +203,156 @@ class TestApiAppConfig(DirectoriesMixin, APITestCase):
)
self.assertFalse(Path(old_logo.path).exists())
def test_api_strips_exif_data_from_uploaded_logo(self) -> None:
"""
GIVEN:
- A JPEG logo upload containing EXIF metadata
WHEN:
- Uploaded via PATCH to app config
THEN:
- Stored logo image has EXIF metadata removed
"""
image = Image.new("RGB", (12, 12), "blue")
exif = Image.Exif()
exif[315] = "Paperless Test Author"
logo = BytesIO()
image.save(logo, format="JPEG", exif=exif)
logo.seek(0)
response = self.client.patch(
f"{self.ENDPOINT}1/",
{
"app_logo": SimpleUploadedFile(
name="logo-with-exif.jpg",
content=logo.getvalue(),
content_type="image/jpeg",
),
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
config = ApplicationConfiguration.objects.first()
with Image.open(config.app_logo.path) as stored_logo:
stored_exif = stored_logo.getexif()
self.assertEqual(len(stored_exif), 0)
def test_api_strips_png_metadata_from_uploaded_logo(self) -> None:
"""
GIVEN:
- A PNG logo upload containing text metadata
WHEN:
- Uploaded via PATCH to app config
THEN:
- Stored logo image has metadata removed
"""
image = Image.new("RGB", (12, 12), "green")
pnginfo = PngInfo()
pnginfo.add_text("Author", "Paperless Test Author")
logo = BytesIO()
image.save(logo, format="PNG", pnginfo=pnginfo)
logo.seek(0)
response = self.client.patch(
f"{self.ENDPOINT}1/",
{
"app_logo": SimpleUploadedFile(
name="logo-with-metadata.png",
content=logo.getvalue(),
content_type="image/png",
),
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
config = ApplicationConfiguration.objects.first()
with Image.open(config.app_logo.path) as stored_logo:
stored_text = stored_logo.text
self.assertEqual(stored_text, {})
def test_api_accepts_valid_gif_logo(self) -> None:
"""
GIVEN:
- A valid GIF logo upload
WHEN:
- Uploaded via PATCH to app config
THEN:
- Upload succeeds
"""
image = Image.new("RGB", (12, 12), "red")
logo = BytesIO()
image.save(logo, format="GIF", comment=b"Paperless Test Comment")
logo.seek(0)
response = self.client.patch(
f"{self.ENDPOINT}1/",
{
"app_logo": SimpleUploadedFile(
name="logo.gif",
content=logo.getvalue(),
content_type="image/gif",
),
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_api_rejects_invalid_raster_logo(self) -> None:
"""
GIVEN:
- A file named as a JPEG but containing non-image payload data
WHEN:
- Uploaded via PATCH to app config
THEN:
- Upload is rejected with 400
"""
response = self.client.patch(
f"{self.ENDPOINT}1/",
{
"app_logo": SimpleUploadedFile(
name="not-an-image.jpg",
content=b"<script>alert('xss')</script>",
content_type="image/jpeg",
),
},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("invalid logo image", str(response.data).lower())
@override_settings(MAX_IMAGE_PIXELS=100)
def test_api_rejects_logo_exceeding_max_image_pixels(self) -> None:
"""
GIVEN:
- A raster logo larger than the configured MAX_IMAGE_PIXELS limit
WHEN:
- Uploaded via PATCH to app config
THEN:
- Upload is rejected with 400
"""
image = Image.new("RGB", (12, 12), "purple")
logo = BytesIO()
image.save(logo, format="PNG")
logo.seek(0)
response = self.client.patch(
f"{self.ENDPOINT}1/",
{
"app_logo": SimpleUploadedFile(
name="too-large.png",
content=logo.getvalue(),
content_type="image/png",
),
},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
"uploaded logo exceeds the maximum allowed image size",
str(response.data).lower(),
)
def test_api_rejects_malicious_svg_logo(self) -> None:
"""
GIVEN:

View File

@@ -18,6 +18,7 @@ from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.core import mail
from django.core.cache import cache
from django.core.files.uploadedfile import SimpleUploadedFile
from django.db import DataError
from django.test import override_settings
from django.utils import timezone
@@ -1377,6 +1378,79 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
self.assertIsNone(overrides.document_type_id)
self.assertIsNone(overrides.tag_ids)
def test_upload_with_path_traversal_filename_is_reduced_to_basename(self) -> None:
self.consume_file_mock.return_value = celery.result.AsyncResult(
id=str(uuid.uuid4()),
)
payload = SimpleUploadedFile(
"../../outside.pdf",
(Path(__file__).parent / "samples" / "simple.pdf").read_bytes(),
content_type="application/pdf",
)
response = self.client.post(
"/api/documents/post_document/",
{"document": payload},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.consume_file_mock.assert_called_once()
input_doc, overrides = self.get_last_consume_delay_call_args()
self.assertEqual(input_doc.original_file.name, "outside.pdf")
self.assertEqual(overrides.filename, "outside.pdf")
self.assertNotIn("..", input_doc.original_file.name)
self.assertNotIn("..", overrides.filename)
self.assertTrue(
input_doc.original_file.resolve(strict=False).is_relative_to(
Path(settings.SCRATCH_DIR).resolve(strict=False),
),
)
def test_upload_with_path_traversal_content_disposition_filename_is_reduced_to_basename(
self,
) -> None:
self.consume_file_mock.return_value = celery.result.AsyncResult(
id=str(uuid.uuid4()),
)
pdf_bytes = (Path(__file__).parent / "samples" / "simple.pdf").read_bytes()
boundary = "paperless-boundary"
payload = (
(
f"--{boundary}\r\n"
'Content-Disposition: form-data; name="document"; '
'filename="../../outside.pdf"\r\n'
"Content-Type: application/pdf\r\n\r\n"
).encode()
+ pdf_bytes
+ f"\r\n--{boundary}--\r\n".encode()
)
response = self.client.generic(
"POST",
"/api/documents/post_document/",
payload,
content_type=f"multipart/form-data; boundary={boundary}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.consume_file_mock.assert_called_once()
input_doc, overrides = self.get_last_consume_delay_call_args()
self.assertEqual(input_doc.original_file.name, "outside.pdf")
self.assertEqual(overrides.filename, "outside.pdf")
self.assertNotIn("..", input_doc.original_file.name)
self.assertNotIn("..", overrides.filename)
self.assertTrue(
input_doc.original_file.resolve(strict=False).is_relative_to(
Path(settings.SCRATCH_DIR).resolve(strict=False),
),
)
def test_document_filters_use_latest_version_content(self) -> None:
root = Document.objects.create(
title="versioned root",

View File

@@ -1503,126 +1503,6 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
[d2.id, d1.id, d3.id],
)
def test_search_ordering_by_score(self) -> None:
"""ordering=-score must return results in descending relevance order (best first)."""
backend = get_backend()
# doc_high has more occurrences of the search term → higher BM25 score
doc_low = Document.objects.create(
title="score sort low",
content="apple",
checksum="SCL1",
)
doc_high = Document.objects.create(
title="score sort high",
content="apple apple apple apple apple",
checksum="SCH1",
)
backend.add_or_update(doc_low)
backend.add_or_update(doc_high)
# -score = descending = best first (highest score)
response = self.client.get("/api/documents/?query=apple&ordering=-score")
self.assertEqual(response.status_code, status.HTTP_200_OK)
ids = [r["id"] for r in response.data["results"]]
self.assertEqual(
ids[0],
doc_high.id,
"Most relevant doc should be first for -score",
)
# score = ascending = worst first (lowest score)
response = self.client.get("/api/documents/?query=apple&ordering=score")
self.assertEqual(response.status_code, status.HTTP_200_OK)
ids = [r["id"] for r in response.data["results"]]
self.assertEqual(
ids[0],
doc_low.id,
"Least relevant doc should be first for +score",
)
def test_search_with_tantivy_native_sort(self) -> None:
"""When ordering by a Tantivy-sortable field, results must be correctly sorted."""
backend = get_backend()
for i, asn in enumerate([30, 10, 20]):
doc = Document.objects.create(
title=f"sortable doc {i}",
content="searchable content",
checksum=f"TNS{i}",
archive_serial_number=asn,
)
backend.add_or_update(doc)
response = self.client.get(
"/api/documents/?query=searchable&ordering=archive_serial_number",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
asns = [doc["archive_serial_number"] for doc in response.data["results"]]
self.assertEqual(asns, [10, 20, 30])
response = self.client.get(
"/api/documents/?query=searchable&ordering=-archive_serial_number",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
asns = [doc["archive_serial_number"] for doc in response.data["results"]]
self.assertEqual(asns, [30, 20, 10])
def test_search_page_2_returns_correct_slice(self) -> None:
"""Page 2 must return the second slice, not overlap with page 1."""
backend = get_backend()
for i in range(10):
doc = Document.objects.create(
title=f"doc {i}",
content="paginated content",
checksum=f"PG2{i}",
archive_serial_number=i + 1,
)
backend.add_or_update(doc)
response = self.client.get(
"/api/documents/?query=paginated&ordering=archive_serial_number&page=1&page_size=3",
)
page1_ids = [r["id"] for r in response.data["results"]]
self.assertEqual(len(page1_ids), 3)
response = self.client.get(
"/api/documents/?query=paginated&ordering=archive_serial_number&page=2&page_size=3",
)
page2_ids = [r["id"] for r in response.data["results"]]
self.assertEqual(len(page2_ids), 3)
# No overlap between pages
self.assertEqual(set(page1_ids) & set(page2_ids), set())
# Page 2 ASNs are higher than page 1
page1_asns = [
Document.objects.get(pk=pk).archive_serial_number for pk in page1_ids
]
page2_asns = [
Document.objects.get(pk=pk).archive_serial_number for pk in page2_ids
]
self.assertTrue(max(page1_asns) < min(page2_asns))
def test_search_all_field_contains_all_ids_when_paginated(self) -> None:
"""The 'all' field must contain every matching ID, even when paginated."""
backend = get_backend()
doc_ids = []
for i in range(10):
doc = Document.objects.create(
title=f"all field doc {i}",
content="allfield content",
checksum=f"AF{i}",
)
backend.add_or_update(doc)
doc_ids.append(doc.pk)
response = self.client.get(
"/api/documents/?query=allfield&page=1&page_size=3",
headers={"Accept": "application/json; version=9"},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data["results"]), 3)
# "all" must contain ALL 10 matching IDs
self.assertCountEqual(response.data["all"], doc_ids)
@mock.patch("documents.bulk_edit.bulk_update_documents")
def test_global_search(self, m) -> None:
"""

View File

@@ -1328,7 +1328,7 @@ class PreConsumeTestCase(DirectoriesMixin, GetConsumerMixin, TestCase):
environment = args[1]
self.assertEqual(command[0], script.name)
self.assertEqual(command[1], str(self.test_file))
self.assertEqual(len(command), 1)
subset = {
"DOCUMENT_SOURCE_PATH": str(c.input_doc.original_file),
@@ -1478,11 +1478,7 @@ class PostConsumeTestCase(DirectoriesMixin, GetConsumerMixin, TestCase):
environment = args[1]
self.assertEqual(command[0], script.name)
self.assertEqual(command[1], str(doc.pk))
self.assertEqual(command[5], f"/api/documents/{doc.pk}/download/")
self.assertEqual(command[6], f"/api/documents/{doc.pk}/thumb/")
self.assertEqual(command[7], "my_bank")
self.assertCountEqual(command[8].split(","), ["a", "b"])
self.assertEqual(len(command), 1)
subset = {
"DOCUMENT_ID": str(doc.pk),

View File

@@ -38,7 +38,6 @@ from django.db.models import Model
from django.db.models import OuterRef
from django.db.models import Prefetch
from django.db.models import Q
from django.db.models import QuerySet
from django.db.models import Subquery
from django.db.models import Sum
from django.db.models import When
@@ -249,13 +248,6 @@ if settings.AUDIT_LOG_ENABLED:
logger = logging.getLogger("paperless.api")
# Crossover point for intersect_and_order: below this count use a targeted
# IN-clause query; at or above this count fall back to a full-table scan +
# Python set intersection. The IN-clause is faster for small result sets but
# degrades on SQLite with thousands of parameters. PostgreSQL handles large IN
# clauses efficiently, so this threshold mainly protects SQLite users.
_TANTIVY_INTERSECT_THRESHOLD = 5_000
class IndexView(TemplateView):
template_name = "index.html"
@@ -2068,16 +2060,19 @@ class UnifiedSearchViewSet(DocumentViewSet):
if not self._is_search_request():
return super().list(request)
from documents.search import SearchHit
from documents.search import SearchMode
from documents.search import TantivyBackend
from documents.search import TantivyRelevanceList
from documents.search import get_backend
def parse_search_params() -> tuple[str | None, bool, bool, int, int]:
"""Extract query string, search mode, and ordering from request."""
active = self._get_active_search_params(request)
if len(active) > 1:
try:
backend = get_backend()
# ORM-filtered queryset: permissions + field filters + ordering (DRF backends applied)
filtered_qs = self.filter_queryset(self.get_queryset())
user = None if request.user.is_superuser else request.user
active_search_params = self._get_active_search_params(request)
if len(active_search_params) > 1:
raise ValidationError(
{
"detail": _(
@@ -2086,161 +2081,73 @@ class UnifiedSearchViewSet(DocumentViewSet):
},
)
ordering_param = request.query_params.get("ordering", "")
sort_reverse = ordering_param.startswith("-")
sort_field_name = ordering_param.lstrip("-") or None
# "score" means relevance order — Tantivy handles it natively,
# so treat it as a Tantivy sort to preserve the ranked order through
# the ORM intersection step.
use_tantivy_sort = (
sort_field_name in TantivyBackend.SORTABLE_FIELDS
or sort_field_name is None
or sort_field_name == "score"
)
try:
page_num = int(request.query_params.get("page", 1))
except (TypeError, ValueError):
page_num = 1
page_size = (
self.paginator.get_page_size(request) or self.paginator.page_size
)
return sort_field_name, sort_reverse, use_tantivy_sort, page_num, page_size
def intersect_and_order(
all_ids: list[int],
filtered_qs: QuerySet[Document],
*,
use_tantivy_sort: bool,
) -> list[int]:
"""Intersect search IDs with ORM-visible IDs, preserving order."""
if not all_ids:
return []
if use_tantivy_sort:
if len(all_ids) <= _TANTIVY_INTERSECT_THRESHOLD:
# Small result set: targeted IN-clause avoids a full-table scan.
visible_ids = set(
filtered_qs.filter(pk__in=all_ids).values_list("pk", flat=True),
)
else:
# Large result set: full-table scan + Python intersection is faster
# than a large IN-clause on SQLite.
visible_ids = set(
filtered_qs.values_list("pk", flat=True),
)
return [doc_id for doc_id in all_ids if doc_id in visible_ids]
return list(
filtered_qs.filter(id__in=all_ids).values_list("pk", flat=True),
)
def run_text_search(
backend: TantivyBackend,
user: User | None,
filtered_qs: QuerySet[Document],
) -> tuple[list[int], list[SearchHit], int]:
"""Handle text/title/query search: IDs, ORM intersection, page highlights."""
if "text" in request.query_params:
search_mode = SearchMode.TEXT
query_str = request.query_params["text"]
elif "title_search" in request.query_params:
search_mode = SearchMode.TITLE
query_str = request.query_params["title_search"]
else:
search_mode = SearchMode.QUERY
query_str = request.query_params["query"]
# "score" is not a real Tantivy sort field — it means relevance order,
# which is Tantivy's default when no sort field is specified.
is_score_sort = sort_field_name == "score"
all_ids = backend.search_ids(
query_str,
user=user,
sort_field=(
None if (not use_tantivy_sort or is_score_sort) else sort_field_name
),
sort_reverse=sort_reverse,
search_mode=search_mode,
)
ordered_ids = intersect_and_order(
all_ids,
filtered_qs,
use_tantivy_sort=use_tantivy_sort,
)
# Tantivy returns relevance results best-first (descending score).
# ordering=score (ascending, worst-first) requires a reversal.
if is_score_sort and not sort_reverse:
ordered_ids = list(reversed(ordered_ids))
page_offset = (page_num - 1) * page_size
page_ids = ordered_ids[page_offset : page_offset + page_size]
page_hits = backend.highlight_hits(
query_str,
page_ids,
search_mode=search_mode,
rank_start=page_offset + 1,
)
return ordered_ids, page_hits, page_offset
def run_more_like_this(
backend: TantivyBackend,
user: User | None,
filtered_qs: QuerySet[Document],
) -> tuple[list[int], list[SearchHit], int]:
"""Handle more_like_id search: permission check, IDs, stub hits."""
try:
more_like_doc_id = int(request.query_params["more_like_id"])
more_like_doc = Document.objects.select_related("owner").get(
pk=more_like_doc_id,
)
except (TypeError, ValueError, Document.DoesNotExist):
raise PermissionDenied(_("Invalid more_like_id"))
if not has_perms_owner_aware(
request.user,
"view_document",
more_like_doc,
if (
"text" in request.query_params
or "title_search" in request.query_params
or "query" in request.query_params
):
raise PermissionDenied(_("Insufficient permissions."))
all_ids = backend.more_like_this_ids(more_like_doc_id, user=user)
ordered_ids = intersect_and_order(
all_ids,
filtered_qs,
use_tantivy_sort=True,
)
page_offset = (page_num - 1) * page_size
page_ids = ordered_ids[page_offset : page_offset + page_size]
page_hits = [
SearchHit(id=doc_id, score=0.0, rank=rank, highlights={})
for rank, doc_id in enumerate(page_ids, start=page_offset + 1)
]
return ordered_ids, page_hits, page_offset
try:
sort_field_name, sort_reverse, use_tantivy_sort, page_num, page_size = (
parse_search_params()
)
backend = get_backend()
filtered_qs = self.filter_queryset(self.get_queryset())
user = None if request.user.is_superuser else request.user
if "more_like_id" in request.query_params:
ordered_ids, page_hits, page_offset = run_more_like_this(
backend,
user,
filtered_qs,
if "text" in request.query_params:
search_mode = SearchMode.TEXT
query_str = request.query_params["text"]
elif "title_search" in request.query_params:
search_mode = SearchMode.TITLE
query_str = request.query_params["title_search"]
else:
search_mode = SearchMode.QUERY
query_str = request.query_params["query"]
results = backend.search(
query_str,
user=user,
page=1,
page_size=10000,
sort_field=None,
sort_reverse=False,
search_mode=search_mode,
)
else:
ordered_ids, page_hits, page_offset = run_text_search(
backend,
user,
filtered_qs,
# more_like_id — validate permission on the seed document first
try:
more_like_doc_id = int(request.query_params["more_like_id"])
more_like_doc = Document.objects.select_related("owner").get(
pk=more_like_doc_id,
)
except (TypeError, ValueError, Document.DoesNotExist):
raise PermissionDenied(_("Invalid more_like_id"))
if not has_perms_owner_aware(
request.user,
"view_document",
more_like_doc,
):
raise PermissionDenied(_("Insufficient permissions."))
results = backend.more_like_this(
more_like_doc_id,
user=user,
page=1,
page_size=10000,
)
rl = TantivyRelevanceList(ordered_ids, page_hits, page_offset)
hits_by_id = {h["id"]: h for h in results.hits}
# Determine sort order: no ordering param -> Tantivy relevance; otherwise -> ORM order
ordering_param = request.query_params.get("ordering", "").lstrip("-")
if not ordering_param:
# Preserve Tantivy relevance order; intersect with ORM-visible IDs
orm_ids = set(filtered_qs.values_list("pk", flat=True))
ordered_hits = [h for h in results.hits if h["id"] in orm_ids]
else:
# Use ORM ordering (already applied by DocumentsOrderingFilter)
hit_ids = set(hits_by_id.keys())
orm_ordered_ids = filtered_qs.filter(id__in=hit_ids).values_list(
"pk",
flat=True,
)
ordered_hits = [
hits_by_id[pk] for pk in orm_ordered_ids if pk in hits_by_id
]
rl = TantivyRelevanceList(ordered_hits)
page = self.paginate_queryset(rl)
if page is not None:
@@ -2250,18 +2157,15 @@ class UnifiedSearchViewSet(DocumentViewSet):
if get_boolean(
str(request.query_params.get("include_selection_data", "false")),
):
# NOTE: pk__in=ordered_ids generates a large SQL IN clause
# for big result sets. Acceptable today but may need a temp
# table or chunked approach if selection_data becomes slow
# at scale (tens of thousands of matching documents).
all_ids = [h["id"] for h in ordered_hits]
response.data["selection_data"] = (
self._get_selection_data_for_queryset(
filtered_qs.filter(pk__in=ordered_ids),
filtered_qs.filter(pk__in=all_ids),
)
)
return response
serializer = self.get_serializer(page_hits, many=True)
serializer = self.get_serializer(ordered_hits, many=True)
return Response(serializer.data)
except NotFound:
@@ -3167,17 +3071,20 @@ class GlobalSearchView(PassUserMixin):
docs = all_docs.filter(title__icontains=query)[:OBJECT_LIMIT]
else:
user = None if request.user.is_superuser else request.user
matching_ids = get_backend().search_ids(
fts_results = get_backend().search(
query,
user=user,
page=1,
page_size=1000,
sort_field=None,
sort_reverse=False,
search_mode=SearchMode.TEXT,
limit=OBJECT_LIMIT * 3,
)
docs_by_id = all_docs.in_bulk(matching_ids)
docs_by_id = all_docs.in_bulk([hit["id"] for hit in fts_results.hits])
docs = [
docs_by_id[doc_id]
for doc_id in matching_ids
if doc_id in docs_by_id
docs_by_id[hit["id"]]
for hit in fts_results.hits
if hit["id"] in docs_by_id
][:OBJECT_LIMIT]
saved_views = (
get_objects_for_user_owner_aware(

View File

@@ -1,4 +1,5 @@
import logging
from io import BytesIO
import magic
from allauth.mfa.adapter import get_adapter as get_mfa_adapter
@@ -11,13 +12,16 @@ from django.contrib.auth.models import Group
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.contrib.auth.password_validation import validate_password
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.core.files.uploadedfile import UploadedFile
from PIL import Image
from rest_framework import serializers
from rest_framework.authtoken.serializers import AuthTokenSerializer
from paperless.models import ApplicationConfiguration
from paperless.network import validate_outbound_http_url
from paperless.validators import reject_dangerous_svg
from paperless.validators import validate_raster_image
from paperless_mail.serialisers import ObfuscatedPasswordField
logger = logging.getLogger("paperless.settings")
@@ -233,9 +237,40 @@ class ApplicationConfigurationSerializer(serializers.ModelSerializer):
instance.app_logo.delete()
return super().update(instance, validated_data)
def _sanitize_raster_image(self, file: UploadedFile) -> UploadedFile:
try:
data = BytesIO()
image = Image.open(file)
image.save(data, format=image.format)
data.seek(0)
return InMemoryUploadedFile(
file=data,
field_name=file.field_name,
name=file.name,
content_type=file.content_type,
size=data.getbuffer().nbytes,
charset=getattr(file, "charset", None),
)
finally:
image.close()
def validate_app_logo(self, file: UploadedFile):
if file and magic.from_buffer(file.read(2048), mime=True) == "image/svg+xml":
reject_dangerous_svg(file)
"""
Validates and sanitizes the uploaded app logo image. Model field already restricts to
jpg/png/gif/svg.
"""
if file:
mime_type = magic.from_buffer(file.read(2048), mime=True)
if mime_type == "image/svg+xml":
reject_dangerous_svg(file)
else:
validate_raster_image(file)
if mime_type in {"image/jpeg", "image/png"}:
file = self._sanitize_raster_image(file)
return file
def validate_llm_endpoint(self, value: str | None) -> str | None:

View File

@@ -1,6 +1,10 @@
from io import BytesIO
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.files.uploadedfile import UploadedFile
from lxml import etree
from PIL import Image
ALLOWED_SVG_TAGS: set[str] = {
# Basic shapes
@@ -254,3 +258,30 @@ def reject_dangerous_svg(file: UploadedFile) -> None:
raise ValidationError(
f"URI scheme not allowed in {attr_name}: must be #anchor, relative path, or data:image/*",
)
def validate_raster_image(file: UploadedFile) -> None:
"""
Validates that the uploaded file is a valid raster image (JPEG, PNG, etc.)
and does not exceed maximum pixel limits.
Raises ValidationError if the image is invalid or exceeds the allowed size.
"""
file.seek(0)
image_data = file.read()
try:
with Image.open(BytesIO(image_data)) as image:
image.verify()
if (
settings.MAX_IMAGE_PIXELS is not None
and settings.MAX_IMAGE_PIXELS > 0
and image.width * image.height > settings.MAX_IMAGE_PIXELS
):
raise ValidationError(
"Uploaded logo exceeds the maximum allowed image size.",
)
if image.format is None: # pragma: no cover
raise ValidationError("Invalid logo image.")
except (OSError, Image.DecompressionBombError) as e:
raise ValidationError("Invalid logo image.") from e

View File

@@ -89,7 +89,7 @@ class StandardPagination(PageNumberPagination):
query = self.page.paginator.object_list
if isinstance(query, TantivyRelevanceList):
return query.get_all_ids()
return [h["id"] for h in query._hits]
return self.page.paginator.object_list.values_list("pk", flat=True)
def get_paginated_response_schema(self, schema):