diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index acfbf6e3d..2e6b414d9 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -169,6 +169,10 @@ class FileStabilityTracker: self._tracked.pop(path, None) yield path + def is_tracking(self, path: Path) -> bool: + """Check whether a path is currently being tracked for stability.""" + return path.resolve() in self._tracked + def has_pending_files(self) -> bool: """Check if there are files waiting for stability check.""" return len(self._tracked) > 0 @@ -370,6 +374,16 @@ class Command(BaseCommand): # Testing timeout in seconds testing_timeout_s: Final[float] = 0.5 + # How often to perform a full-glob rescan of the consume directory as a + # safety net. Each watchfiles watcher is torn down and recreated on every + # batch to reconfigure its timeout, and a fresh watcher silently adopts the + # current directory contents as its baseline. A file that appears between + # one batch and the next watcher's baseline is therefore never reported and + # would sit in the consume directory forever. This periodic rescan re-injects + # such files into the stability tracker (see GH issue #13011). Not currently + # user-configurable; instances may override for testing. + rescan_interval_s: float = 300.0 + def add_arguments(self, parser) -> None: parser.add_argument( "directory", @@ -425,7 +439,7 @@ class Command(BaseCommand): ) # Process existing files - self._process_existing_files( + queued = self._process_existing_files( directory=directory, recursive=recursive, subdirs_as_tags=subdirs_as_tags, @@ -445,6 +459,7 @@ class Command(BaseCommand): polling_interval=polling_interval, stability_delay=stability_delay, is_testing=is_testing, + queued=queued, ) logger.debug("Consumer exiting") @@ -456,11 +471,18 @@ class Command(BaseCommand): recursive: bool, subdirs_as_tags: bool, consumer_filter: ConsumerFilter, - ) -> None: - """Process any existing files in the consumption directory.""" + ) -> set[Path]: + """ + Process any existing files in the consumption directory. + + Returns the set of resolved paths that were queued, so the watch loop + can seed its in-flight set and avoid re-queuing them on the first + rescan before the consume tasks have removed them from disk. + """ logger.info(f"Processing existing files in {directory}") glob_pattern = "**/*" if recursive else "*" + queued: set[Path] = set() for filepath in directory.glob(glob_pattern): # Use filter to check if file should be processed @@ -475,6 +497,48 @@ class Command(BaseCommand): consumption_dir=directory, subdirs_as_tags=subdirs_as_tags, ) + queued.add(filepath.resolve()) + + return queued + + def _rescan_existing_files( + self, + *, + directory: Path, + recursive: bool, + consumer_filter: ConsumerFilter, + tracker: FileStabilityTracker, + queued: set[Path], + ) -> None: + """ + Re-inject on-disk files the watcher never reported into the tracker. + + Acts as a safety net for files stranded by the watcher-recreation gap + (see ``rescan_interval_s``). Files already being tracked or already + queued and awaiting consumption are skipped, so a file is never queued + twice. Queued paths that have since left the directory are pruned so a + later file reusing the same name is not skipped forever. + """ + # Prune in-flight paths that have left the directory + for path in list(queued): + if not path.exists(): + queued.discard(path) + + glob_pattern = "**/*" if recursive else "*" + + for filepath in directory.glob(glob_pattern): + if not filepath.is_file(): + continue + + if not consumer_filter(Change.added, str(filepath)): + continue + + resolved = filepath.resolve() + if tracker.is_tracking(resolved) or resolved in queued: + continue + + logger.debug(f"Rescan found untracked file: {resolved}") + tracker.track(resolved, Change.added) def _watch_directory( self, @@ -486,11 +550,24 @@ class Command(BaseCommand): polling_interval: float, stability_delay: float, is_testing: bool, + queued: set[Path] | None = None, ) -> None: """Watch directory for changes and process stable files.""" use_polling = polling_interval > 0 poll_delay_ms = int(polling_interval * 1000) if use_polling else 0 + # Resolved paths that have been queued and are awaiting consumption. + # Seeded from the startup scan so the first rescan does not re-queue + # files whose consume tasks have not yet removed them from disk. + queued = set() if queued is None else queued + + # Full-glob safety net cadence (0 disables) + rescan_interval_s = self.rescan_interval_s + rescan_timeout_ms = ( + int(rescan_interval_s * 1000) if rescan_interval_s > 0 else 0 + ) + last_rescan = monotonic() + if use_polling: logger.info( f"Watching {directory} using polling (interval: {polling_interval}s)", @@ -505,6 +582,20 @@ class Command(BaseCommand): stability_timeout_ms = int(stability_delay * 1000) testing_timeout_ms = int(self.testing_timeout_s * 1000) + def cap_for_rescan(ms: int) -> int: + """ + Ensure the watch loop wakes often enough to run the rescan. + + ``watch()`` blocks for up to ``rust_timeout``, so the rescan can + only run that often. A timeout of 0 means "wait indefinitely", + which would never wake to rescan; cap it at the rescan interval. + """ + if rescan_timeout_ms <= 0: + return ms + if ms <= 0: + return rescan_timeout_ms + return min(ms, rescan_timeout_ms) + # Calculate appropriate timeout for watch loop # In polling mode, rust_timeout must be significantly longer than poll_delay_ms # to ensure poll cycles can complete before timing out @@ -522,6 +613,8 @@ class Command(BaseCommand): # Not testing, wait indefinitely for first event timeout_ms = 0 + timeout_ms = cap_for_rescan(timeout_ms) + self.stop_flag.clear() while not self.stop_flag.is_set(): @@ -551,10 +644,26 @@ class Command(BaseCommand): consumption_dir=directory, subdirs_as_tags=subdirs_as_tags, ) + # Remember it so the rescan does not re-queue it while + # the consume task has yet to remove it from disk + queued.add(stable_path) # Exit watch loop to reconfigure timeout break + # Periodic full-glob safety net for files the watcher missed + if rescan_timeout_ms > 0 and ( + monotonic() - last_rescan >= rescan_interval_s + ): + self._rescan_existing_files( + directory=directory, + recursive=recursive, + consumer_filter=consumer_filter, + tracker=tracker, + queued=queued, + ) + last_rescan = monotonic() + # Determine next timeout if tracker.has_pending_files(): # Check pending files at stability interval @@ -572,6 +681,8 @@ class Command(BaseCommand): # No pending files, wait indefinitely timeout_ms = 0 + timeout_ms = cap_for_rescan(timeout_ms) + except KeyboardInterrupt: # pragma: nocover logger.info("Received interrupt, stopping consumer") self.stop_flag.set() diff --git a/src/documents/tests/test_management_consumer.py b/src/documents/tests/test_management_consumer.py index 707397788..a5f8fb350 100644 --- a/src/documents/tests/test_management_consumer.py +++ b/src/documents/tests/test_management_consumer.py @@ -684,6 +684,7 @@ class ConsumerThread(Thread): subdirs_as_tags: bool = False, polling_interval: float = 0, stability_delay: float = 0.1, + rescan_interval: float | None = None, ) -> None: super().__init__() self.consumption_dir = consumption_dir @@ -693,6 +694,8 @@ class ConsumerThread(Thread): self.polling_interval = polling_interval self.stability_delay = stability_delay self.cmd = Command() + if rescan_interval is not None: + self.cmd.rescan_interval_s = rescan_interval self.cmd.stop_flag.clear() # Non-daemon ensures finally block runs and connections are closed self.daemon = False @@ -1052,3 +1055,200 @@ class TestCommandWatchEdgeCases: thread.stop_and_wait(timeout=5.0) # Clean up any Tags created by the thread Tag.objects.all().delete() + + +class TestRescanExistingFiles: + """ + Unit tests for the rescan safety net. + + Each ``watch()`` recreation silently adopts the current directory contents + as its baseline, so a file appearing between one batch and the next + watcher's baseline is never reported and would sit in the consume directory + forever. ``_rescan_existing_files`` re-injects such files into the + stability tracker as a periodic safety net (see GH issue #13011). + """ + + @pytest.fixture + def pdf_only_filter(self) -> ConsumerFilter: + return ConsumerFilter( + supported_extensions=frozenset({".pdf"}), + ignore_patterns=[], + ) + + def _rescan( + self, + directory: Path, + consumer_filter: ConsumerFilter, + tracker: FileStabilityTracker, + queued: set[Path], + *, + recursive: bool = False, + ) -> None: + Command()._rescan_existing_files( + directory=directory, + recursive=recursive, + consumer_filter=consumer_filter, + tracker=tracker, + queued=queued, + ) + + def test_tracks_stranded_file( + self, + consumption_dir: Path, + sample_pdf: Path, + pdf_only_filter: ConsumerFilter, + ) -> None: + """A supported on-disk file the watcher never reported gets tracked.""" + target = consumption_dir / "stranded.pdf" + shutil.copy(sample_pdf, target) + tracker = FileStabilityTracker(stability_delay=0.1) + + self._rescan(consumption_dir, pdf_only_filter, tracker, set()) + + assert tracker.is_tracking(target) is True + assert tracker.pending_count == 1 + + def test_skips_already_tracked_file( + self, + consumption_dir: Path, + sample_pdf: Path, + pdf_only_filter: ConsumerFilter, + ) -> None: + """A file already being tracked by the watcher is not double-tracked.""" + target = consumption_dir / "tracked.pdf" + shutil.copy(sample_pdf, target) + tracker = FileStabilityTracker(stability_delay=0.1) + tracker.track(target, Change.added) + + self._rescan(consumption_dir, pdf_only_filter, tracker, set()) + + assert tracker.pending_count == 1 + + def test_skips_queued_file( + self, + consumption_dir: Path, + sample_pdf: Path, + pdf_only_filter: ConsumerFilter, + ) -> None: + """A file already queued and awaiting consumption is not re-tracked.""" + target = consumption_dir / "inflight.pdf" + shutil.copy(sample_pdf, target) + tracker = FileStabilityTracker(stability_delay=0.1) + queued = {target.resolve()} + + self._rescan(consumption_dir, pdf_only_filter, tracker, queued) + + assert tracker.pending_count == 0 + + def test_prunes_vanished_queued_paths( + self, + consumption_dir: Path, + pdf_only_filter: ConsumerFilter, + ) -> None: + """Queued paths no longer on disk are dropped so the name can recur.""" + gone = (consumption_dir / "gone.pdf").resolve() + tracker = FileStabilityTracker(stability_delay=0.1) + queued = {gone} + + self._rescan(consumption_dir, pdf_only_filter, tracker, queued) + + assert gone not in queued + + def test_skips_unsupported_extension( + self, + consumption_dir: Path, + pdf_only_filter: ConsumerFilter, + ) -> None: + """Files filtered out by the consumer filter are not tracked.""" + (consumption_dir / "notes.xyz").write_bytes(b"content") + tracker = FileStabilityTracker(stability_delay=0.1) + + self._rescan(consumption_dir, pdf_only_filter, tracker, set()) + + assert tracker.pending_count == 0 + + def test_recursive_respects_flag( + self, + consumption_dir: Path, + sample_pdf: Path, + pdf_only_filter: ConsumerFilter, + ) -> None: + """Nested files are only found when recursive scanning is enabled.""" + subdir = consumption_dir / "nested" + subdir.mkdir() + target = subdir / "deep.pdf" + shutil.copy(sample_pdf, target) + + shallow = FileStabilityTracker(stability_delay=0.1) + self._rescan(consumption_dir, pdf_only_filter, shallow, set()) + assert shallow.pending_count == 0 + + deep = FileStabilityTracker(stability_delay=0.1) + self._rescan(consumption_dir, pdf_only_filter, deep, set(), recursive=True) + assert deep.is_tracking(target) is True + + +class TestProcessExistingFilesQueued: + """Tests that startup processing reports which paths it queued.""" + + @pytest.mark.usefixtures("mock_supported_extensions") + def test_returns_queued_paths( + self, + consumption_dir: Path, + sample_pdf: Path, + mock_consume_file_delay: MagicMock, + settings: SettingsWrapper, + ) -> None: + """The set returned seeds the rescan's queued set, avoiding re-queue.""" + target = consumption_dir / "document.pdf" + shutil.copy(sample_pdf, target) + settings.CONSUMER_IGNORE_PATTERNS = [] + + queued = Command()._process_existing_files( + directory=consumption_dir, + recursive=False, + subdirs_as_tags=False, + consumer_filter=ConsumerFilter(ignore_patterns=[]), + ) + + assert target.resolve() in queued + + +@pytest.mark.management +@pytest.mark.django_db +class TestCommandRescanRecovery: + """End-to-end test that the rescan recovers files the watcher misses.""" + + def test_rescan_consumes_file_the_watcher_never_reports( + self, + consumption_dir: Path, + sample_pdf: Path, + mock_consume_file_delay: MagicMock, + start_consumer: Callable[..., ConsumerThread], + ) -> None: + """ + Isolate the rescan path: a long polling interval guarantees the + watcher cannot report the file within the test window, so only the + periodic rescan can consume it. + """ + # poll interval far longer than the test window -> watcher stays silent + thread = start_consumer( + polling_interval=30.0, + stability_delay=0.1, + rescan_interval=0.5, + ) + + # created after startup, so _process_existing_files did not see it + target = consumption_dir / "stranded.pdf" + shutil.copy(sample_pdf, target) + + wait_for_mock_call(mock_consume_file_delay.apply_async, timeout_s=5.0) + + if thread.exception: + raise thread.exception + + mock_consume_file_delay.apply_async.assert_called() + call_args = mock_consume_file_delay.apply_async.call_args.kwargs["kwargs"][ + "input_doc" + ] + assert call_args.original_file.name == "stranded.pdf"