mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-05-30 18:35:30 +00:00
Fix: Handle tanvity index lock contention (#12856)
Implements and tests a retry with backoff + jitter for aquring the index update lock. If we still can't get it, dispatch a celery task to handle it later instead (also with retry) Signed-off-by: stumpylog <797416+stumpylog@users.noreply.github.com>
This commit is contained in:
@@ -1,12 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from datetime import UTC
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Final
|
||||
from typing import Self
|
||||
from typing import TypedDict
|
||||
from typing import TypeVar
|
||||
@@ -43,6 +46,11 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger("paperless.search")
|
||||
|
||||
_LOCK_TIMEOUT_SECONDS: Final[float] = 10.0 # per-attempt acquire timeout
|
||||
_LOCK_RETRY_ATTEMPTS: Final[int] = 4 # total attempts (1 initial + 3 retries)
|
||||
_LOCK_BACKOFF_BASE: Final[float] = 1.0 # seconds
|
||||
_LOCK_BACKOFF_CAP: Final[float] = 10.0 # seconds
|
||||
|
||||
_WORD_RE = regex.compile(r"\w+")
|
||||
_AUTOCOMPLETE_REGEX_TIMEOUT = 1.0 # seconds; guards against ReDoS on untrusted content
|
||||
|
||||
@@ -183,12 +191,27 @@ class WriteBatch:
|
||||
if self._backend._path is not None:
|
||||
lock_path = self._backend._path / ".tantivy.lock"
|
||||
self._lock = filelock.FileLock(str(lock_path))
|
||||
try:
|
||||
self._lock.acquire(timeout=self._lock_timeout)
|
||||
except filelock.Timeout as e: # pragma: no cover
|
||||
raise SearchIndexLockError(
|
||||
f"Could not acquire index lock within {self._lock_timeout}s",
|
||||
) from e
|
||||
for attempt in range(_LOCK_RETRY_ATTEMPTS):
|
||||
try:
|
||||
self._lock.acquire(timeout=self._lock_timeout)
|
||||
break
|
||||
except filelock.Timeout:
|
||||
if attempt == _LOCK_RETRY_ATTEMPTS - 1:
|
||||
raise SearchIndexLockError(
|
||||
f"Could not acquire index lock after {_LOCK_RETRY_ATTEMPTS} "
|
||||
f"attempts (timeout={self._lock_timeout}s each)",
|
||||
)
|
||||
sleep_s = random.uniform(
|
||||
0,
|
||||
min(_LOCK_BACKOFF_CAP, _LOCK_BACKOFF_BASE * (2**attempt)),
|
||||
)
|
||||
logger.debug(
|
||||
"Index lock contention; retrying in %.2fs (attempt %d/%d)",
|
||||
sleep_s,
|
||||
attempt + 1,
|
||||
_LOCK_RETRY_ATTEMPTS,
|
||||
)
|
||||
time.sleep(sleep_s)
|
||||
|
||||
self._raw_writer = self._backend._index.writer()
|
||||
return self
|
||||
@@ -490,13 +513,28 @@ class TantivyBackend:
|
||||
Convenience method for single-document updates. For bulk operations,
|
||||
use batch_update() context manager for better performance.
|
||||
|
||||
On lock exhaustion after all retry attempts, schedules a deferred
|
||||
index_document Celery task and returns normally. Callers will NOT
|
||||
receive a SearchIndexLockError; the index write is deferred silently.
|
||||
|
||||
Args:
|
||||
document: Django Document instance to index
|
||||
effective_content: Override document.content for indexing
|
||||
"""
|
||||
self._ensure_open()
|
||||
with self.batch_update(lock_timeout=5.0) as batch:
|
||||
batch.add_or_update(document, effective_content)
|
||||
try:
|
||||
with self.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
|
||||
batch.add_or_update(document, effective_content)
|
||||
except SearchIndexLockError:
|
||||
logger.error(
|
||||
"Search index lock exhausted for document %d after %d attempts; "
|
||||
"scheduling deferred index write",
|
||||
document.pk,
|
||||
_LOCK_RETRY_ATTEMPTS,
|
||||
)
|
||||
from documents.tasks import index_document
|
||||
|
||||
index_document.apply_async(args=[document.pk], countdown=60)
|
||||
|
||||
def remove(self, doc_id: int) -> None:
|
||||
"""
|
||||
@@ -505,12 +543,27 @@ class TantivyBackend:
|
||||
Convenience method for single-document removal. For bulk operations,
|
||||
use batch_update() context manager for better performance.
|
||||
|
||||
On lock exhaustion after all retry attempts, schedules a deferred
|
||||
remove_document_from_index Celery task and returns normally.
|
||||
Callers will NOT receive a SearchIndexLockError.
|
||||
|
||||
Args:
|
||||
doc_id: Primary key of the document to remove
|
||||
"""
|
||||
self._ensure_open()
|
||||
with self.batch_update(lock_timeout=5.0) as batch:
|
||||
batch.remove(doc_id)
|
||||
try:
|
||||
with self.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
|
||||
batch.remove(doc_id)
|
||||
except SearchIndexLockError:
|
||||
logger.error(
|
||||
"Search index lock exhausted for doc_id %d after %d attempts; "
|
||||
"scheduling deferred index removal",
|
||||
doc_id,
|
||||
_LOCK_RETRY_ATTEMPTS,
|
||||
)
|
||||
from documents.tasks import remove_document_from_index
|
||||
|
||||
remove_document_from_index.apply_async(args=[doc_id], countdown=60)
|
||||
|
||||
def highlight_hits(
|
||||
self,
|
||||
|
||||
@@ -56,6 +56,7 @@ from documents.plugins.base import StopConsumeTaskError
|
||||
from documents.plugins.helpers import ProgressManager
|
||||
from documents.plugins.helpers import ProgressStatusOptions
|
||||
from documents.sanity_checker import SanityCheckFailedException
|
||||
from documents.search._backend import SearchIndexLockError
|
||||
from documents.signals import document_updated
|
||||
from documents.signals.handlers import cleanup_document_deletion
|
||||
from documents.signals.handlers import run_workflows
|
||||
@@ -84,6 +85,63 @@ def index_optimize() -> None:
|
||||
)
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
ignore_result=True,
|
||||
autoretry_for=(SearchIndexLockError,),
|
||||
max_retries=5,
|
||||
retry_backoff=60,
|
||||
retry_jitter=True,
|
||||
)
|
||||
def index_document(self, document_id: int) -> None:
|
||||
"""
|
||||
Deferred single-document index write.
|
||||
|
||||
Used as a self-healing fallback when add_or_update() exhausts its lock retry
|
||||
budget during high-concurrency consumption. Runs via batch_update() directly
|
||||
to avoid re-entering the deferred scheduling path in add_or_update().
|
||||
|
||||
If the document was deleted before this task runs, it exits cleanly.
|
||||
"""
|
||||
from documents.search import get_backend
|
||||
|
||||
try:
|
||||
document = Document.objects.get(pk=document_id)
|
||||
except Document.DoesNotExist:
|
||||
logger.info(
|
||||
"index_document: document %d no longer exists; skipping",
|
||||
document_id,
|
||||
)
|
||||
return
|
||||
with get_backend().batch_update() as batch:
|
||||
batch.add_or_update(
|
||||
document,
|
||||
effective_content=document.get_effective_content(),
|
||||
)
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
ignore_result=True,
|
||||
autoretry_for=(SearchIndexLockError,),
|
||||
max_retries=5,
|
||||
retry_backoff=60,
|
||||
retry_jitter=True,
|
||||
)
|
||||
def remove_document_from_index(self, doc_id: int) -> None:
|
||||
"""
|
||||
Deferred single-document index removal.
|
||||
|
||||
Used as a self-healing fallback when remove() exhausts its lock retry budget.
|
||||
Operates only on the Tantivy index; no database lookup required.
|
||||
If the document has already been removed, the term-query delete is a no-op.
|
||||
"""
|
||||
from documents.search import get_backend
|
||||
|
||||
with get_backend().batch_update() as batch:
|
||||
batch.remove(doc_id)
|
||||
|
||||
|
||||
@shared_task
|
||||
def train_classifier(
|
||||
*,
|
||||
|
||||
@@ -0,0 +1,248 @@
|
||||
"""Tests for search index lock backoff, retry logic, and self-healing deferred tasks."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import filelock
|
||||
import pytest
|
||||
|
||||
from documents.search._backend import _LOCK_BACKOFF_CAP
|
||||
from documents.search._backend import _LOCK_RETRY_ATTEMPTS
|
||||
from documents.search._backend import _LOCK_TIMEOUT_SECONDS
|
||||
from documents.search._backend import SearchIndexLockError
|
||||
from documents.search._backend import TantivyBackend
|
||||
from documents.tasks import index_document
|
||||
from documents.tasks import remove_document_from_index
|
||||
from documents.tests.factories import DocumentFactory
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
pytestmark = pytest.mark.search
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def disk_backend(tmp_path: Path) -> Generator[TantivyBackend, None, None]:
|
||||
"""On-disk TantivyBackend so the file-lock code path is exercised."""
|
||||
b = TantivyBackend(path=tmp_path)
|
||||
b.open()
|
||||
try:
|
||||
yield b
|
||||
finally:
|
||||
b.close()
|
||||
|
||||
|
||||
class TestWriteBatchLockRetry:
|
||||
"""Test WriteBatch retry loop with backoff + full jitter."""
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_lock_retries_then_succeeds(
|
||||
self,
|
||||
disk_backend: TantivyBackend,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Timeout on first 3 attempts then success on 4th — document must be indexed."""
|
||||
doc = DocumentFactory()
|
||||
|
||||
acquire_calls = 0
|
||||
|
||||
def flaky_acquire(timeout: float) -> None:
|
||||
nonlocal acquire_calls
|
||||
acquire_calls += 1
|
||||
# Raise Timeout for first _LOCK_RETRY_ATTEMPTS - 1 calls, succeed on last
|
||||
if acquire_calls < _LOCK_RETRY_ATTEMPTS:
|
||||
raise filelock.Timeout("")
|
||||
|
||||
sleep_values: list[float] = []
|
||||
|
||||
mocker.patch(
|
||||
"documents.search._backend.filelock.FileLock.acquire",
|
||||
side_effect=flaky_acquire,
|
||||
)
|
||||
mock_sleep = mocker.patch(
|
||||
"documents.search._backend.time.sleep",
|
||||
side_effect=lambda s: sleep_values.append(s),
|
||||
)
|
||||
|
||||
# Should not raise — 4th attempt succeeds
|
||||
with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
|
||||
batch.add_or_update(doc)
|
||||
|
||||
# sleep called exactly _LOCK_RETRY_ATTEMPTS - 1 times (once per failed attempt)
|
||||
assert mock_sleep.call_count == _LOCK_RETRY_ATTEMPTS - 1
|
||||
|
||||
# All sleep values must be in [0, _LOCK_BACKOFF_CAP]
|
||||
for s in sleep_values:
|
||||
assert 0 <= s <= _LOCK_BACKOFF_CAP, (
|
||||
f"Sleep value {s} outside [0, {_LOCK_BACKOFF_CAP}]"
|
||||
)
|
||||
|
||||
def test_lock_exhaustion_raises_search_index_lock_error(
|
||||
self,
|
||||
disk_backend: TantivyBackend,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""All acquire attempts raise Timeout — WriteBatch must raise SearchIndexLockError."""
|
||||
mocker.patch(
|
||||
"documents.search._backend.filelock.FileLock.acquire",
|
||||
side_effect=filelock.Timeout(""),
|
||||
)
|
||||
mocker.patch("documents.search._backend.time.sleep")
|
||||
|
||||
with pytest.raises(SearchIndexLockError):
|
||||
with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS):
|
||||
pass
|
||||
|
||||
def test_jitter_values_in_range(
|
||||
self,
|
||||
disk_backend: TantivyBackend,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Sleep values must always lie in [0, _LOCK_BACKOFF_CAP] across many samples."""
|
||||
mocker.patch(
|
||||
"documents.search._backend.filelock.FileLock.acquire",
|
||||
side_effect=filelock.Timeout(""),
|
||||
)
|
||||
sleep_values: list[float] = []
|
||||
mocker.patch(
|
||||
"documents.search._backend.time.sleep",
|
||||
side_effect=lambda s: sleep_values.append(s),
|
||||
)
|
||||
for _ in range(50):
|
||||
sleep_values.clear()
|
||||
with pytest.raises(SearchIndexLockError):
|
||||
with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS):
|
||||
pass
|
||||
|
||||
for s in sleep_values:
|
||||
assert 0 <= s <= _LOCK_BACKOFF_CAP, (
|
||||
f"Jitter {s} exceeds cap {_LOCK_BACKOFF_CAP}"
|
||||
)
|
||||
|
||||
|
||||
class TestAddOrUpdateDeferredScheduling:
|
||||
"""Test that add_or_update() and remove() defer to Celery on lock exhaustion."""
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_lock_exhaustion_schedules_deferred_task(
|
||||
self,
|
||||
disk_backend: TantivyBackend,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Lock exhaustion in add_or_update must schedule index_document task, not raise."""
|
||||
doc = DocumentFactory()
|
||||
|
||||
mocker.patch(
|
||||
"documents.search._backend.filelock.FileLock.acquire",
|
||||
side_effect=filelock.Timeout(""),
|
||||
)
|
||||
mocker.patch("documents.search._backend.time.sleep")
|
||||
mock_apply = mocker.patch("documents.tasks.index_document.apply_async")
|
||||
|
||||
# Must NOT raise
|
||||
disk_backend.add_or_update(doc)
|
||||
|
||||
mock_apply.assert_called_once_with(args=[doc.pk], countdown=60)
|
||||
|
||||
def test_remove_exhaustion_schedules_deferred_task(
|
||||
self,
|
||||
disk_backend: TantivyBackend,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Lock exhaustion in remove() must schedule remove_document_from_index task, not raise."""
|
||||
doc_id = 503
|
||||
|
||||
mocker.patch(
|
||||
"documents.search._backend.filelock.FileLock.acquire",
|
||||
side_effect=filelock.Timeout(""),
|
||||
)
|
||||
mocker.patch("documents.search._backend.time.sleep")
|
||||
mock_apply = mocker.patch(
|
||||
"documents.tasks.remove_document_from_index.apply_async",
|
||||
)
|
||||
|
||||
# Must NOT raise
|
||||
disk_backend.remove(doc_id)
|
||||
|
||||
mock_apply.assert_called_once_with(args=[doc_id], countdown=60)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestIndexDocumentTask:
|
||||
"""Test the deferred index_document and remove_document_from_index Celery tasks."""
|
||||
|
||||
def test_index_document_task_skips_deleted_document(
|
||||
self,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""index_document with a non-existent doc_id must return cleanly and log INFO."""
|
||||
nonexistent_id = 999999
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="paperless.tasks"):
|
||||
index_document(nonexistent_id)
|
||||
|
||||
assert any("no longer exists" in record.message for record in caplog.records), (
|
||||
"Expected INFO log about missing document"
|
||||
)
|
||||
|
||||
def test_index_document_task_indexes_existing_document(
|
||||
self,
|
||||
backend: TantivyBackend,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""index_document task must add the document to the index via batch_update."""
|
||||
doc = DocumentFactory(content="via deferred task")
|
||||
|
||||
# get_backend is imported lazily inside the task: `from documents.search import get_backend`
|
||||
mocker.patch(
|
||||
"documents.search.get_backend",
|
||||
return_value=backend,
|
||||
)
|
||||
index_document(doc.pk)
|
||||
|
||||
ids = backend.search_ids("deferred task", user=None)
|
||||
assert doc.pk in ids
|
||||
|
||||
def test_remove_document_from_index_task_removes_existing_document(
|
||||
self,
|
||||
backend: TantivyBackend,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""remove_document_from_index task must remove the document from the index."""
|
||||
doc = DocumentFactory(content="will be removed by deferred task")
|
||||
backend.add_or_update(doc)
|
||||
assert doc.pk in backend.search_ids("removed", user=None)
|
||||
|
||||
mocker.patch("documents.search.get_backend", return_value=backend)
|
||||
remove_document_from_index(doc.pk)
|
||||
|
||||
assert doc.pk not in backend.search_ids("removed", user=None)
|
||||
|
||||
def test_task_does_not_swallow_lock_error(
|
||||
self,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Verifies the task body propagates SearchIndexLockError so Celery's
|
||||
autoretry_for can catch it (rather than the task swallowing the error
|
||||
and silently succeeding)."""
|
||||
doc = DocumentFactory()
|
||||
|
||||
mock_batch = mocker.MagicMock()
|
||||
mock_batch.__enter__ = mocker.MagicMock(
|
||||
side_effect=SearchIndexLockError("exhausted"),
|
||||
)
|
||||
mock_batch.__exit__ = mocker.MagicMock(return_value=False)
|
||||
|
||||
mock_backend = mocker.MagicMock()
|
||||
mock_backend.batch_update.return_value = mock_batch
|
||||
|
||||
# get_backend is imported lazily inside the task: `from documents.search import get_backend`
|
||||
mocker.patch("documents.search.get_backend", return_value=mock_backend)
|
||||
|
||||
with pytest.raises(SearchIndexLockError):
|
||||
index_document(doc.pk)
|
||||
Reference in New Issue
Block a user