mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-02-17 09:03:57 +00:00
Compare commits
1 Commits
main
...
fix-celery
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0729b9fce |
@@ -151,6 +151,50 @@ class TestSystemStatus(APITestCase):
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["tasks"]["celery_status"], "OK")
|
||||
|
||||
@mock.patch("celery.app.control.Inspect.ping")
|
||||
def test_system_status_celery_ping_none(self, mock_ping) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Celery ping returns no worker responses
|
||||
WHEN:
|
||||
- The user requests the system status
|
||||
THEN:
|
||||
- The response contains an error celery status
|
||||
"""
|
||||
mock_ping.return_value = None
|
||||
self.client.force_login(self.user)
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["tasks"]["celery_status"], "WARNING")
|
||||
self.assertEqual(
|
||||
response.data["tasks"]["celery_error"],
|
||||
"No celery workers responded to ping. This may be temporary.",
|
||||
)
|
||||
|
||||
@mock.patch("documents.views.sleep")
|
||||
@mock.patch("celery.app.control.Inspect.ping")
|
||||
def test_system_status_celery_ping_retry_success(
|
||||
self,
|
||||
mock_ping,
|
||||
mock_sleep,
|
||||
) -> None:
|
||||
"""
|
||||
GIVEN:
|
||||
- Celery ping fails once but succeeds on retry
|
||||
WHEN:
|
||||
- The user requests the system status
|
||||
THEN:
|
||||
- The response contains an OK celery status
|
||||
"""
|
||||
mock_ping.side_effect = [None, {"hostname": {"ok": "pong"}}]
|
||||
self.client.force_login(self.user)
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["tasks"]["celery_status"], "OK")
|
||||
self.assertIsNone(response.data["tasks"]["celery_error"])
|
||||
self.assertEqual(mock_ping.call_count, 2)
|
||||
mock_sleep.assert_called_once_with(0.25)
|
||||
|
||||
@override_settings(INDEX_DIR=Path("/tmp/index"))
|
||||
@mock.patch("whoosh.index.FileIndex.last_modified")
|
||||
def test_system_status_index_ok(self, mock_last_modified):
|
||||
|
||||
@@ -10,6 +10,7 @@ from collections import deque
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from time import mktime
|
||||
from time import sleep
|
||||
from typing import Literal
|
||||
from unicodedata import normalize
|
||||
from urllib.parse import quote
|
||||
@@ -3035,11 +3036,29 @@ class SystemStatusView(PassUserMixin):
|
||||
celery_error = None
|
||||
celery_url = None
|
||||
try:
|
||||
celery_ping = celery_app.control.inspect().ping()
|
||||
celery_url = next(iter(celery_ping.keys()))
|
||||
first_worker_ping = celery_ping[celery_url]
|
||||
if first_worker_ping["ok"] == "pong":
|
||||
celery_active = "OK"
|
||||
celery_ping = None
|
||||
for ping_attempt in range(3):
|
||||
celery_ping = celery_app.control.inspect().ping()
|
||||
if celery_ping:
|
||||
break
|
||||
if ping_attempt < 2:
|
||||
sleep(0.25)
|
||||
|
||||
if not celery_ping:
|
||||
celery_active = "WARNING"
|
||||
celery_error = (
|
||||
"No celery workers responded to ping. This may be temporary."
|
||||
)
|
||||
else:
|
||||
celery_url, first_worker_ping = next(iter(celery_ping.items()))
|
||||
if (
|
||||
isinstance(first_worker_ping, dict)
|
||||
and first_worker_ping.get("ok") == "pong"
|
||||
):
|
||||
celery_active = "OK"
|
||||
else:
|
||||
celery_active = "WARNING"
|
||||
celery_error = "Celery worker responded unexpectedly."
|
||||
except Exception as e:
|
||||
celery_active = "ERROR"
|
||||
logger.exception(
|
||||
|
||||
Reference in New Issue
Block a user