Compare commits

...

2 Commits

Author SHA1 Message Date
stumpylog 0bef44e108 Experiments with Granian static file serving 2026-05-29 08:14:41 -07:00
Trenton H 525b986e23 Fix: Handle tanvity index lock contention (#12856)
Implements and tests a retry with backoff + jitter for aquring the index update lock.  If we still can't get it, dispatch a celery task to handle it later instead (also with retry)

Signed-off-by: stumpylog <797416+stumpylog@users.noreply.github.com>
2026-05-27 09:47:13 -07:00
7 changed files with 381 additions and 16 deletions
+1 -3
View File
@@ -104,8 +104,6 @@ ARG JBIG2ENC_VERSION=0.30
# Set Python environment variables # Set Python environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \ ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \ PYTHONUNBUFFERED=1 \
# Ignore warning from Whitenoise about async iterators
PYTHONWARNINGS="ignore:::django.http.response:517" \
PNGX_CONTAINERIZED=1 \ PNGX_CONTAINERIZED=1 \
# https://docs.astral.sh/uv/reference/settings/#link-mode # https://docs.astral.sh/uv/reference/settings/#link-mode
UV_LINK_MODE=copy UV_LINK_MODE=copy
@@ -239,7 +237,7 @@ RUN set -eux \
&& echo "Making fontconfig cache writable for arbitrary container UIDs" \ && echo "Making fontconfig cache writable for arbitrary container UIDs" \
&& chmod 1777 /var/cache/fontconfig \ && chmod 1777 /var/cache/fontconfig \
&& echo "Collecting static files" \ && echo "Collecting static files" \
&& PAPERLESS_SECRET_KEY=build-time-dummy s6-setuidgid paperless python3 manage.py collectstatic --clear --no-input --link \ && PAPERLESS_SECRET_KEY=build-time-dummy s6-setuidgid paperless python3 manage.py collectstatic --clear --no-input \
&& PAPERLESS_SECRET_KEY=build-time-dummy s6-setuidgid paperless python3 manage.py compilemessages \ && PAPERLESS_SECRET_KEY=build-time-dummy s6-setuidgid paperless python3 manage.py compilemessages \
&& /usr/local/bin/deduplicate.py --verbose /usr/src/paperless/static/ && /usr/local/bin/deduplicate.py --verbose /usr/src/paperless/static/
@@ -8,6 +8,13 @@ export GRANIAN_HOST=${GRANIAN_HOST:-${PAPERLESS_BIND_ADDR:-"::"}}
export GRANIAN_PORT=${GRANIAN_PORT:-${PAPERLESS_PORT:-8000}} export GRANIAN_PORT=${GRANIAN_PORT:-${PAPERLESS_PORT:-8000}}
export GRANIAN_WORKERS=${GRANIAN_WORKERS:-${PAPERLESS_WEBSERVER_WORKERS:-1}} export GRANIAN_WORKERS=${GRANIAN_WORKERS:-${PAPERLESS_WEBSERVER_WORKERS:-1}}
# Static file serving: Granian matches against the raw URI path (before any
# SCRIPT_NAME stripping), so the route must include the subpath prefix.
_static_dir="${PAPERLESS_STATICDIR:-/usr/src/paperless/static}"
_static_route="${PAPERLESS_FORCE_SCRIPT_NAME}/static"
export GRANIAN_STATIC_PATH_MOUNT=${GRANIAN_STATIC_PATH_MOUNT:-${_static_dir}}
export GRANIAN_STATIC_PATH_ROUTE=${GRANIAN_STATIC_PATH_ROUTE:-${_static_route:-/static}}
# Only set GRANIAN_URL_PATH_PREFIX if PAPERLESS_FORCE_SCRIPT_NAME is set # Only set GRANIAN_URL_PATH_PREFIX if PAPERLESS_FORCE_SCRIPT_NAME is set
if [[ -n "${PAPERLESS_FORCE_SCRIPT_NAME}" ]]; then if [[ -n "${PAPERLESS_FORCE_SCRIPT_NAME}" ]]; then
export GRANIAN_URL_PATH_PREFIX=${PAPERLESS_FORCE_SCRIPT_NAME} export GRANIAN_URL_PATH_PREFIX=${PAPERLESS_FORCE_SCRIPT_NAME}
+4
View File
@@ -23,6 +23,10 @@ ExecStart=/bin/sh -c '\
[ -n "$PAPERLESS_WEBSERVER_WORKERS" ] && export GRANIAN_WORKERS=$PAPERLESS_WEBSERVER_WORKERS; \ [ -n "$PAPERLESS_WEBSERVER_WORKERS" ] && export GRANIAN_WORKERS=$PAPERLESS_WEBSERVER_WORKERS; \
# URL path prefix: only set if PAPERLESS_FORCE_SCRIPT_NAME exists \ # URL path prefix: only set if PAPERLESS_FORCE_SCRIPT_NAME exists \
[ -n "$PAPERLESS_FORCE_SCRIPT_NAME" ] && export GRANIAN_URL_PATH_PREFIX=$PAPERLESS_FORCE_SCRIPT_NAME; \ [ -n "$PAPERLESS_FORCE_SCRIPT_NAME" ] && export GRANIAN_URL_PATH_PREFIX=$PAPERLESS_FORCE_SCRIPT_NAME; \
# Static file serving: Granian matches the raw URI path (before SCRIPT_NAME stripping), \
# so the route must include any subpath prefix. \
[ -z "$GRANIAN_STATIC_PATH_MOUNT" ] && export GRANIAN_STATIC_PATH_MOUNT=${PAPERLESS_STATICDIR:-/opt/paperless/static}; \
[ -z "$GRANIAN_STATIC_PATH_ROUTE" ] && export GRANIAN_STATIC_PATH_ROUTE="${PAPERLESS_FORCE_SCRIPT_NAME}/static"; \
exec granian --interface asginl --ws --loop uvloop "paperless.asgi:application"' exec granian --interface asginl --ws --loop uvloop "paperless.asgi:application"'
[Install] [Install]
+63 -10
View File
@@ -1,12 +1,15 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import random
import re import re
import threading import threading
import time
from datetime import UTC from datetime import UTC
from datetime import datetime from datetime import datetime
from enum import StrEnum from enum import StrEnum
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Final
from typing import Self from typing import Self
from typing import TypedDict from typing import TypedDict
from typing import TypeVar from typing import TypeVar
@@ -43,6 +46,11 @@ if TYPE_CHECKING:
logger = logging.getLogger("paperless.search") logger = logging.getLogger("paperless.search")
_LOCK_TIMEOUT_SECONDS: Final[float] = 10.0 # per-attempt acquire timeout
_LOCK_RETRY_ATTEMPTS: Final[int] = 4 # total attempts (1 initial + 3 retries)
_LOCK_BACKOFF_BASE: Final[float] = 1.0 # seconds
_LOCK_BACKOFF_CAP: Final[float] = 10.0 # seconds
_WORD_RE = regex.compile(r"\w+") _WORD_RE = regex.compile(r"\w+")
_AUTOCOMPLETE_REGEX_TIMEOUT = 1.0 # seconds; guards against ReDoS on untrusted content _AUTOCOMPLETE_REGEX_TIMEOUT = 1.0 # seconds; guards against ReDoS on untrusted content
@@ -183,12 +191,27 @@ class WriteBatch:
if self._backend._path is not None: if self._backend._path is not None:
lock_path = self._backend._path / ".tantivy.lock" lock_path = self._backend._path / ".tantivy.lock"
self._lock = filelock.FileLock(str(lock_path)) self._lock = filelock.FileLock(str(lock_path))
try: for attempt in range(_LOCK_RETRY_ATTEMPTS):
self._lock.acquire(timeout=self._lock_timeout) try:
except filelock.Timeout as e: # pragma: no cover self._lock.acquire(timeout=self._lock_timeout)
raise SearchIndexLockError( break
f"Could not acquire index lock within {self._lock_timeout}s", except filelock.Timeout:
) from e if attempt == _LOCK_RETRY_ATTEMPTS - 1:
raise SearchIndexLockError(
f"Could not acquire index lock after {_LOCK_RETRY_ATTEMPTS} "
f"attempts (timeout={self._lock_timeout}s each)",
)
sleep_s = random.uniform(
0,
min(_LOCK_BACKOFF_CAP, _LOCK_BACKOFF_BASE * (2**attempt)),
)
logger.debug(
"Index lock contention; retrying in %.2fs (attempt %d/%d)",
sleep_s,
attempt + 1,
_LOCK_RETRY_ATTEMPTS,
)
time.sleep(sleep_s)
self._raw_writer = self._backend._index.writer() self._raw_writer = self._backend._index.writer()
return self return self
@@ -490,13 +513,28 @@ class TantivyBackend:
Convenience method for single-document updates. For bulk operations, Convenience method for single-document updates. For bulk operations,
use batch_update() context manager for better performance. use batch_update() context manager for better performance.
On lock exhaustion after all retry attempts, schedules a deferred
index_document Celery task and returns normally. Callers will NOT
receive a SearchIndexLockError; the index write is deferred silently.
Args: Args:
document: Django Document instance to index document: Django Document instance to index
effective_content: Override document.content for indexing effective_content: Override document.content for indexing
""" """
self._ensure_open() self._ensure_open()
with self.batch_update(lock_timeout=5.0) as batch: try:
batch.add_or_update(document, effective_content) with self.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
batch.add_or_update(document, effective_content)
except SearchIndexLockError:
logger.error(
"Search index lock exhausted for document %d after %d attempts; "
"scheduling deferred index write",
document.pk,
_LOCK_RETRY_ATTEMPTS,
)
from documents.tasks import index_document
index_document.apply_async(args=[document.pk], countdown=60)
def remove(self, doc_id: int) -> None: def remove(self, doc_id: int) -> None:
""" """
@@ -505,12 +543,27 @@ class TantivyBackend:
Convenience method for single-document removal. For bulk operations, Convenience method for single-document removal. For bulk operations,
use batch_update() context manager for better performance. use batch_update() context manager for better performance.
On lock exhaustion after all retry attempts, schedules a deferred
remove_document_from_index Celery task and returns normally.
Callers will NOT receive a SearchIndexLockError.
Args: Args:
doc_id: Primary key of the document to remove doc_id: Primary key of the document to remove
""" """
self._ensure_open() self._ensure_open()
with self.batch_update(lock_timeout=5.0) as batch: try:
batch.remove(doc_id) with self.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
batch.remove(doc_id)
except SearchIndexLockError:
logger.error(
"Search index lock exhausted for doc_id %d after %d attempts; "
"scheduling deferred index removal",
doc_id,
_LOCK_RETRY_ATTEMPTS,
)
from documents.tasks import remove_document_from_index
remove_document_from_index.apply_async(args=[doc_id], countdown=60)
def highlight_hits( def highlight_hits(
self, self,
+58
View File
@@ -56,6 +56,7 @@ from documents.plugins.base import StopConsumeTaskError
from documents.plugins.helpers import ProgressManager from documents.plugins.helpers import ProgressManager
from documents.plugins.helpers import ProgressStatusOptions from documents.plugins.helpers import ProgressStatusOptions
from documents.sanity_checker import SanityCheckFailedException from documents.sanity_checker import SanityCheckFailedException
from documents.search._backend import SearchIndexLockError
from documents.signals import document_updated from documents.signals import document_updated
from documents.signals.handlers import cleanup_document_deletion from documents.signals.handlers import cleanup_document_deletion
from documents.signals.handlers import run_workflows from documents.signals.handlers import run_workflows
@@ -84,6 +85,63 @@ def index_optimize() -> None:
) )
@shared_task(
bind=True,
ignore_result=True,
autoretry_for=(SearchIndexLockError,),
max_retries=5,
retry_backoff=60,
retry_jitter=True,
)
def index_document(self, document_id: int) -> None:
"""
Deferred single-document index write.
Used as a self-healing fallback when add_or_update() exhausts its lock retry
budget during high-concurrency consumption. Runs via batch_update() directly
to avoid re-entering the deferred scheduling path in add_or_update().
If the document was deleted before this task runs, it exits cleanly.
"""
from documents.search import get_backend
try:
document = Document.objects.get(pk=document_id)
except Document.DoesNotExist:
logger.info(
"index_document: document %d no longer exists; skipping",
document_id,
)
return
with get_backend().batch_update() as batch:
batch.add_or_update(
document,
effective_content=document.get_effective_content(),
)
@shared_task(
bind=True,
ignore_result=True,
autoretry_for=(SearchIndexLockError,),
max_retries=5,
retry_backoff=60,
retry_jitter=True,
)
def remove_document_from_index(self, doc_id: int) -> None:
"""
Deferred single-document index removal.
Used as a self-healing fallback when remove() exhausts its lock retry budget.
Operates only on the Tantivy index; no database lookup required.
If the document has already been removed, the term-query delete is a no-op.
"""
from documents.search import get_backend
with get_backend().batch_update() as batch:
batch.remove(doc_id)
@shared_task @shared_task
def train_classifier( def train_classifier(
*, *,
@@ -0,0 +1,248 @@
"""Tests for search index lock backoff, retry logic, and self-healing deferred tasks."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import filelock
import pytest
from documents.search._backend import _LOCK_BACKOFF_CAP
from documents.search._backend import _LOCK_RETRY_ATTEMPTS
from documents.search._backend import _LOCK_TIMEOUT_SECONDS
from documents.search._backend import SearchIndexLockError
from documents.search._backend import TantivyBackend
from documents.tasks import index_document
from documents.tasks import remove_document_from_index
from documents.tests.factories import DocumentFactory
if TYPE_CHECKING:
from collections.abc import Generator
from pathlib import Path
from pytest_mock import MockerFixture
pytestmark = pytest.mark.search
@pytest.fixture
def disk_backend(tmp_path: Path) -> Generator[TantivyBackend, None, None]:
"""On-disk TantivyBackend so the file-lock code path is exercised."""
b = TantivyBackend(path=tmp_path)
b.open()
try:
yield b
finally:
b.close()
class TestWriteBatchLockRetry:
"""Test WriteBatch retry loop with backoff + full jitter."""
@pytest.mark.django_db
def test_lock_retries_then_succeeds(
self,
disk_backend: TantivyBackend,
mocker: MockerFixture,
) -> None:
"""Timeout on first 3 attempts then success on 4th — document must be indexed."""
doc = DocumentFactory()
acquire_calls = 0
def flaky_acquire(timeout: float) -> None:
nonlocal acquire_calls
acquire_calls += 1
# Raise Timeout for first _LOCK_RETRY_ATTEMPTS - 1 calls, succeed on last
if acquire_calls < _LOCK_RETRY_ATTEMPTS:
raise filelock.Timeout("")
sleep_values: list[float] = []
mocker.patch(
"documents.search._backend.filelock.FileLock.acquire",
side_effect=flaky_acquire,
)
mock_sleep = mocker.patch(
"documents.search._backend.time.sleep",
side_effect=lambda s: sleep_values.append(s),
)
# Should not raise — 4th attempt succeeds
with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
batch.add_or_update(doc)
# sleep called exactly _LOCK_RETRY_ATTEMPTS - 1 times (once per failed attempt)
assert mock_sleep.call_count == _LOCK_RETRY_ATTEMPTS - 1
# All sleep values must be in [0, _LOCK_BACKOFF_CAP]
for s in sleep_values:
assert 0 <= s <= _LOCK_BACKOFF_CAP, (
f"Sleep value {s} outside [0, {_LOCK_BACKOFF_CAP}]"
)
def test_lock_exhaustion_raises_search_index_lock_error(
self,
disk_backend: TantivyBackend,
mocker: MockerFixture,
) -> None:
"""All acquire attempts raise Timeout — WriteBatch must raise SearchIndexLockError."""
mocker.patch(
"documents.search._backend.filelock.FileLock.acquire",
side_effect=filelock.Timeout(""),
)
mocker.patch("documents.search._backend.time.sleep")
with pytest.raises(SearchIndexLockError):
with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS):
pass
def test_jitter_values_in_range(
self,
disk_backend: TantivyBackend,
mocker: MockerFixture,
) -> None:
"""Sleep values must always lie in [0, _LOCK_BACKOFF_CAP] across many samples."""
mocker.patch(
"documents.search._backend.filelock.FileLock.acquire",
side_effect=filelock.Timeout(""),
)
sleep_values: list[float] = []
mocker.patch(
"documents.search._backend.time.sleep",
side_effect=lambda s: sleep_values.append(s),
)
for _ in range(50):
sleep_values.clear()
with pytest.raises(SearchIndexLockError):
with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS):
pass
for s in sleep_values:
assert 0 <= s <= _LOCK_BACKOFF_CAP, (
f"Jitter {s} exceeds cap {_LOCK_BACKOFF_CAP}"
)
class TestAddOrUpdateDeferredScheduling:
"""Test that add_or_update() and remove() defer to Celery on lock exhaustion."""
@pytest.mark.django_db
def test_lock_exhaustion_schedules_deferred_task(
self,
disk_backend: TantivyBackend,
mocker: MockerFixture,
) -> None:
"""Lock exhaustion in add_or_update must schedule index_document task, not raise."""
doc = DocumentFactory()
mocker.patch(
"documents.search._backend.filelock.FileLock.acquire",
side_effect=filelock.Timeout(""),
)
mocker.patch("documents.search._backend.time.sleep")
mock_apply = mocker.patch("documents.tasks.index_document.apply_async")
# Must NOT raise
disk_backend.add_or_update(doc)
mock_apply.assert_called_once_with(args=[doc.pk], countdown=60)
def test_remove_exhaustion_schedules_deferred_task(
self,
disk_backend: TantivyBackend,
mocker: MockerFixture,
) -> None:
"""Lock exhaustion in remove() must schedule remove_document_from_index task, not raise."""
doc_id = 503
mocker.patch(
"documents.search._backend.filelock.FileLock.acquire",
side_effect=filelock.Timeout(""),
)
mocker.patch("documents.search._backend.time.sleep")
mock_apply = mocker.patch(
"documents.tasks.remove_document_from_index.apply_async",
)
# Must NOT raise
disk_backend.remove(doc_id)
mock_apply.assert_called_once_with(args=[doc_id], countdown=60)
@pytest.mark.django_db
class TestIndexDocumentTask:
"""Test the deferred index_document and remove_document_from_index Celery tasks."""
def test_index_document_task_skips_deleted_document(
self,
caplog: pytest.LogCaptureFixture,
) -> None:
"""index_document with a non-existent doc_id must return cleanly and log INFO."""
nonexistent_id = 999999
with caplog.at_level(logging.INFO, logger="paperless.tasks"):
index_document(nonexistent_id)
assert any("no longer exists" in record.message for record in caplog.records), (
"Expected INFO log about missing document"
)
def test_index_document_task_indexes_existing_document(
self,
backend: TantivyBackend,
mocker: MockerFixture,
) -> None:
"""index_document task must add the document to the index via batch_update."""
doc = DocumentFactory(content="via deferred task")
# get_backend is imported lazily inside the task: `from documents.search import get_backend`
mocker.patch(
"documents.search.get_backend",
return_value=backend,
)
index_document(doc.pk)
ids = backend.search_ids("deferred task", user=None)
assert doc.pk in ids
def test_remove_document_from_index_task_removes_existing_document(
self,
backend: TantivyBackend,
mocker: MockerFixture,
) -> None:
"""remove_document_from_index task must remove the document from the index."""
doc = DocumentFactory(content="will be removed by deferred task")
backend.add_or_update(doc)
assert doc.pk in backend.search_ids("removed", user=None)
mocker.patch("documents.search.get_backend", return_value=backend)
remove_document_from_index(doc.pk)
assert doc.pk not in backend.search_ids("removed", user=None)
def test_task_does_not_swallow_lock_error(
self,
mocker: MockerFixture,
) -> None:
"""Verifies the task body propagates SearchIndexLockError so Celery's
autoretry_for can catch it (rather than the task swallowing the error
and silently succeeding)."""
doc = DocumentFactory()
mock_batch = mocker.MagicMock()
mock_batch.__enter__ = mocker.MagicMock(
side_effect=SearchIndexLockError("exhausted"),
)
mock_batch.__exit__ = mocker.MagicMock(return_value=False)
mock_backend = mocker.MagicMock()
mock_backend.batch_update.return_value = mock_batch
# get_backend is imported lazily inside the task: `from documents.search import get_backend`
mocker.patch("documents.search.get_backend", return_value=mock_backend)
with pytest.raises(SearchIndexLockError):
index_document(doc.pk)
-3
View File
@@ -118,7 +118,6 @@ SCRATCH_DIR = get_path_from_env(
env_apps = get_list_from_env("PAPERLESS_APPS") env_apps = get_list_from_env("PAPERLESS_APPS")
INSTALLED_APPS = [ INSTALLED_APPS = [
"whitenoise.runserver_nostatic",
"django.contrib.auth", "django.contrib.auth",
"django.contrib.contenttypes", "django.contrib.contenttypes",
"django.contrib.sessions", "django.contrib.sessions",
@@ -173,7 +172,6 @@ if DEBUG:
MIDDLEWARE = [ MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware", "django.middleware.security.SecurityMiddleware",
"whitenoise.middleware.WhiteNoiseMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware", "django.contrib.sessions.middleware.SessionMiddleware",
"corsheaders.middleware.CorsMiddleware", "corsheaders.middleware.CorsMiddleware",
"django.middleware.locale.LocaleMiddleware", "django.middleware.locale.LocaleMiddleware",
@@ -232,7 +230,6 @@ WSGI_APPLICATION = "paperless.wsgi.application"
ASGI_APPLICATION = "paperless.asgi.application" ASGI_APPLICATION = "paperless.asgi.application"
STATIC_URL = os.getenv("PAPERLESS_STATIC_URL", BASE_URL + "static/") STATIC_URL = os.getenv("PAPERLESS_STATIC_URL", BASE_URL + "static/")
WHITENOISE_STATIC_PREFIX = "/static/"
STORAGES = { STORAGES = {
"staticfiles": { "staticfiles": {