mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-04-16 04:58:53 +00:00
test(tasks): improve test_api_tasks.py structure and add api marker
- Move admin_client, v9_client, user_client fixtures to conftest.py so they can be reused by other API tests; all three now build on the rest_api_client fixture instead of creating APIClient() directly - Move regular_user fixture to conftest.py (was already done, now also used by the new client fixtures) - Add docstrings to every test method describing the behaviour under test - Move timedelta/timezone imports to module level - Register 'api' pytest marker in pyproject.toml and apply pytestmark to the entire file so all 40 tests are selectable via -m api Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -311,6 +311,7 @@ markers = [
|
||||
"date_parsing: Tests which cover date parsing from content or filename",
|
||||
"management: Tests which cover management commands/functionality",
|
||||
"search: Tests for the Tantivy search backend",
|
||||
"api: Tests for REST API endpoints",
|
||||
]
|
||||
|
||||
[tool.pytest_env]
|
||||
|
||||
@@ -13,6 +13,8 @@ from rest_framework.test import APIClient
|
||||
|
||||
from documents.tests.factories import DocumentFactory
|
||||
|
||||
UserModelT = get_user_model()
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from documents.models import Document
|
||||
|
||||
@@ -126,15 +128,34 @@ def rest_api_client():
|
||||
yield APIClient()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def authenticated_rest_api_client(rest_api_client: APIClient):
|
||||
"""
|
||||
The basic DRF ApiClient which has been authenticated
|
||||
"""
|
||||
UserModel = get_user_model()
|
||||
user = UserModel.objects.create_user(username="testuser", password="password")
|
||||
rest_api_client.force_authenticate(user=user)
|
||||
yield rest_api_client
|
||||
@pytest.fixture()
|
||||
def regular_user(django_user_model: type[UserModelT]) -> UserModelT:
|
||||
"""Unprivileged authenticated user for permission boundary tests."""
|
||||
return django_user_model.objects.create_user(username="regular", password="regular")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def admin_client(rest_api_client: APIClient, admin_user: UserModelT) -> APIClient:
|
||||
"""Admin client pre-authenticated and sending the v10 Accept header."""
|
||||
rest_api_client.force_authenticate(user=admin_user)
|
||||
rest_api_client.credentials(HTTP_ACCEPT="application/json; version=10")
|
||||
return rest_api_client
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def v9_client(rest_api_client: APIClient, admin_user: UserModelT) -> APIClient:
|
||||
"""Admin client pre-authenticated and sending the v9 Accept header."""
|
||||
rest_api_client.force_authenticate(user=admin_user)
|
||||
rest_api_client.credentials(HTTP_ACCEPT="application/json; version=9")
|
||||
return rest_api_client
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def user_client(rest_api_client: APIClient, regular_user: UserModelT) -> APIClient:
|
||||
"""Regular-user client pre-authenticated and sending the v10 Accept header."""
|
||||
rest_api_client.force_authenticate(user=regular_user)
|
||||
rest_api_client.credentials(HTTP_ACCEPT="application/json; version=10")
|
||||
return rest_api_client
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
|
||||
@@ -7,62 +7,30 @@ Covers:
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from datetime import timedelta
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.utils import timezone
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from documents.models import PaperlessTask
|
||||
from documents.tests.factories import PaperlessTaskFactory
|
||||
|
||||
pytestmark = pytest.mark.api
|
||||
|
||||
ENDPOINT = "/api/tasks/"
|
||||
ACCEPT_V10 = "application/json; version=10"
|
||||
ACCEPT_V9 = "application/json; version=9"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def regular_user(django_user_model: User) -> User:
|
||||
return django_user_model.objects.create_user(username="regular", password="regular")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def admin_client(admin_user: User) -> APIClient:
|
||||
"""Authenticated admin client sending v10 Accept header."""
|
||||
client = APIClient()
|
||||
client.force_authenticate(user=admin_user)
|
||||
client.credentials(HTTP_ACCEPT=ACCEPT_V10)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def v9_client(admin_user: User) -> APIClient:
|
||||
"""Authenticated admin client sending v9 Accept header."""
|
||||
client = APIClient()
|
||||
client.force_authenticate(user=admin_user)
|
||||
client.credentials(HTTP_ACCEPT=ACCEPT_V9)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def user_client(regular_user: User) -> APIClient:
|
||||
"""Authenticated regular-user client sending v10 Accept header."""
|
||||
client = APIClient()
|
||||
client.force_authenticate(user=regular_user)
|
||||
client.credentials(HTTP_ACCEPT=ACCEPT_V10)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.mark.django_db()
|
||||
class TestGetTasksV10:
|
||||
def test_list_returns_tasks(self, admin_client: APIClient) -> None:
|
||||
"""GET /api/tasks/ returns all tasks visible to the admin."""
|
||||
PaperlessTaskFactory.create_batch(2)
|
||||
|
||||
response = admin_client.get(ENDPOINT)
|
||||
@@ -74,6 +42,7 @@ class TestGetTasksV10:
|
||||
self,
|
||||
admin_client: APIClient,
|
||||
) -> None:
|
||||
"""related_document_ids includes the consumed document_id from result_data."""
|
||||
PaperlessTaskFactory(
|
||||
status=PaperlessTask.Status.SUCCESS,
|
||||
result_data={"document_id": 7},
|
||||
@@ -88,6 +57,7 @@ class TestGetTasksV10:
|
||||
self,
|
||||
admin_client: APIClient,
|
||||
) -> None:
|
||||
"""related_document_ids includes duplicate_of when the file was already archived."""
|
||||
PaperlessTaskFactory(
|
||||
status=PaperlessTask.Status.SUCCESS,
|
||||
result_data={"duplicate_of": 12},
|
||||
@@ -99,6 +69,7 @@ class TestGetTasksV10:
|
||||
assert response.data[0]["related_document_ids"] == [12]
|
||||
|
||||
def test_filter_by_task_type(self, admin_client: APIClient) -> None:
|
||||
"""?task_type= filters results to tasks of that type only."""
|
||||
PaperlessTaskFactory(task_type=PaperlessTask.TaskType.CONSUME_FILE)
|
||||
PaperlessTaskFactory(task_type=PaperlessTask.TaskType.TRAIN_CLASSIFIER)
|
||||
|
||||
@@ -112,6 +83,7 @@ class TestGetTasksV10:
|
||||
assert response.data[0]["task_type"] == PaperlessTask.TaskType.TRAIN_CLASSIFIER
|
||||
|
||||
def test_filter_by_status(self, admin_client: APIClient) -> None:
|
||||
"""?status= filters results to tasks with that status only."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.PENDING)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.SUCCESS)
|
||||
|
||||
@@ -125,8 +97,9 @@ class TestGetTasksV10:
|
||||
assert response.data[0]["status"] == PaperlessTask.Status.SUCCESS
|
||||
|
||||
def test_filter_by_task_id(self, admin_client: APIClient) -> None:
|
||||
"""?task_id= returns only the task with that UUID."""
|
||||
task = PaperlessTaskFactory()
|
||||
PaperlessTaskFactory() # another task that should not appear
|
||||
PaperlessTaskFactory() # unrelated task that should not appear
|
||||
|
||||
response = admin_client.get(ENDPOINT, {"task_id": task.task_id})
|
||||
|
||||
@@ -135,6 +108,7 @@ class TestGetTasksV10:
|
||||
assert response.data[0]["task_id"] == task.task_id
|
||||
|
||||
def test_filter_by_acknowledged(self, admin_client: APIClient) -> None:
|
||||
"""?acknowledged=false returns only tasks that have not been acknowledged."""
|
||||
PaperlessTaskFactory(acknowledged=False)
|
||||
PaperlessTaskFactory(acknowledged=True)
|
||||
|
||||
@@ -145,6 +119,7 @@ class TestGetTasksV10:
|
||||
assert response.data[0]["acknowledged"] is False
|
||||
|
||||
def test_filter_is_complete_true(self, admin_client: APIClient) -> None:
|
||||
"""?is_complete=true returns only SUCCESS and FAILURE tasks."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.PENDING)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.SUCCESS)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.FAILURE)
|
||||
@@ -159,23 +134,8 @@ class TestGetTasksV10:
|
||||
PaperlessTask.Status.FAILURE,
|
||||
}
|
||||
|
||||
def test_default_ordering_is_newest_first(self, admin_client: APIClient) -> None:
|
||||
from datetime import timedelta
|
||||
|
||||
from django.utils import timezone
|
||||
|
||||
base = timezone.now()
|
||||
t1 = PaperlessTaskFactory(date_created=base)
|
||||
t2 = PaperlessTaskFactory(date_created=base + timedelta(seconds=1))
|
||||
t3 = PaperlessTaskFactory(date_created=base + timedelta(seconds=2))
|
||||
|
||||
response = admin_client.get(ENDPOINT)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
ids = [t["task_id"] for t in response.data]
|
||||
assert ids == [t3.task_id, t2.task_id, t1.task_id]
|
||||
|
||||
def test_filter_is_complete_false(self, admin_client: APIClient) -> None:
|
||||
"""?is_complete=false returns only PENDING and STARTED tasks."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.PENDING)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.STARTED)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.SUCCESS)
|
||||
@@ -190,6 +150,19 @@ class TestGetTasksV10:
|
||||
PaperlessTask.Status.STARTED,
|
||||
}
|
||||
|
||||
def test_default_ordering_is_newest_first(self, admin_client: APIClient) -> None:
|
||||
"""Tasks are returned in descending date_created order (newest first)."""
|
||||
base = timezone.now()
|
||||
t1 = PaperlessTaskFactory(date_created=base)
|
||||
t2 = PaperlessTaskFactory(date_created=base + timedelta(seconds=1))
|
||||
t3 = PaperlessTaskFactory(date_created=base + timedelta(seconds=2))
|
||||
|
||||
response = admin_client.get(ENDPOINT)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
ids = [t["task_id"] for t in response.data]
|
||||
assert ids == [t3.task_id, t2.task_id, t1.task_id]
|
||||
|
||||
def test_list_is_owner_aware(
|
||||
self,
|
||||
admin_user: User,
|
||||
@@ -220,6 +193,7 @@ class TestGetTasksV10:
|
||||
@pytest.mark.django_db()
|
||||
class TestGetTasksV9:
|
||||
def test_task_name_equals_task_type_value(self, v9_client: APIClient) -> None:
|
||||
"""task_name mirrors the task_type value for v9 backwards compatibility."""
|
||||
PaperlessTaskFactory(task_type=PaperlessTask.TaskType.CONSUME_FILE)
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -228,6 +202,7 @@ class TestGetTasksV9:
|
||||
assert response.data[0]["task_name"] == "consume_file"
|
||||
|
||||
def test_task_file_name_from_input_data(self, v9_client: APIClient) -> None:
|
||||
"""task_file_name is read from input_data['filename']."""
|
||||
PaperlessTaskFactory(input_data={"filename": "report.pdf"})
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -239,6 +214,7 @@ class TestGetTasksV9:
|
||||
self,
|
||||
v9_client: APIClient,
|
||||
) -> None:
|
||||
"""task_file_name is None when filename is absent from input_data."""
|
||||
PaperlessTaskFactory(input_data={})
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -247,6 +223,7 @@ class TestGetTasksV9:
|
||||
assert response.data[0]["task_file_name"] is None
|
||||
|
||||
def test_type_scheduled_maps_to_scheduled_task(self, v9_client: APIClient) -> None:
|
||||
"""trigger_source=scheduled maps to type='SCHEDULED_TASK' in v9."""
|
||||
PaperlessTaskFactory(trigger_source=PaperlessTask.TriggerSource.SCHEDULED)
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -255,6 +232,7 @@ class TestGetTasksV9:
|
||||
assert response.data[0]["type"] == "SCHEDULED_TASK"
|
||||
|
||||
def test_type_system_maps_to_auto_task(self, v9_client: APIClient) -> None:
|
||||
"""trigger_source=system maps to type='AUTO_TASK' in v9."""
|
||||
PaperlessTaskFactory(trigger_source=PaperlessTask.TriggerSource.SYSTEM)
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -263,6 +241,7 @@ class TestGetTasksV9:
|
||||
assert response.data[0]["type"] == "AUTO_TASK"
|
||||
|
||||
def test_type_web_ui_maps_to_manual_task(self, v9_client: APIClient) -> None:
|
||||
"""trigger_source=web_ui maps to type='MANUAL_TASK' in v9."""
|
||||
PaperlessTaskFactory(trigger_source=PaperlessTask.TriggerSource.WEB_UI)
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -271,6 +250,7 @@ class TestGetTasksV9:
|
||||
assert response.data[0]["type"] == "MANUAL_TASK"
|
||||
|
||||
def test_type_manual_maps_to_manual_task(self, v9_client: APIClient) -> None:
|
||||
"""trigger_source=manual maps to type='MANUAL_TASK' in v9."""
|
||||
PaperlessTaskFactory(trigger_source=PaperlessTask.TriggerSource.MANUAL)
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -282,6 +262,7 @@ class TestGetTasksV9:
|
||||
self,
|
||||
v9_client: APIClient,
|
||||
) -> None:
|
||||
"""related_document is taken from result_data['document_id'] in v9."""
|
||||
PaperlessTaskFactory(
|
||||
status=PaperlessTask.Status.SUCCESS,
|
||||
result_data={"document_id": 99},
|
||||
@@ -296,6 +277,7 @@ class TestGetTasksV9:
|
||||
self,
|
||||
v9_client: APIClient,
|
||||
) -> None:
|
||||
"""related_document is None when result_data is absent in v9."""
|
||||
PaperlessTaskFactory(result_data=None)
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -304,6 +286,7 @@ class TestGetTasksV9:
|
||||
assert response.data[0]["related_document"] is None
|
||||
|
||||
def test_duplicate_documents_from_result_data(self, v9_client: APIClient) -> None:
|
||||
"""duplicate_documents includes duplicate_of from result_data in v9."""
|
||||
PaperlessTaskFactory(
|
||||
status=PaperlessTask.Status.SUCCESS,
|
||||
result_data={"duplicate_of": 55},
|
||||
@@ -318,6 +301,7 @@ class TestGetTasksV9:
|
||||
self,
|
||||
v9_client: APIClient,
|
||||
) -> None:
|
||||
"""duplicate_documents is an empty list when result_data is absent in v9."""
|
||||
PaperlessTaskFactory(result_data=None)
|
||||
|
||||
response = v9_client.get(ENDPOINT)
|
||||
@@ -326,7 +310,7 @@ class TestGetTasksV9:
|
||||
assert response.data[0]["duplicate_documents"] == []
|
||||
|
||||
def test_filter_by_task_name_maps_to_task_type(self, v9_client: APIClient) -> None:
|
||||
"""v9 ?task_name=consume_file filter maps to the task_type field."""
|
||||
"""?task_name=consume_file filter maps to the task_type field for v9 compatibility."""
|
||||
PaperlessTaskFactory(task_type=PaperlessTask.TaskType.CONSUME_FILE)
|
||||
PaperlessTaskFactory(task_type=PaperlessTask.TaskType.TRAIN_CLASSIFIER)
|
||||
|
||||
@@ -337,7 +321,7 @@ class TestGetTasksV9:
|
||||
assert response.data[0]["task_name"] == "consume_file"
|
||||
|
||||
def test_filter_by_type_maps_to_trigger_source(self, v9_client: APIClient) -> None:
|
||||
"""v9 ?type=SCHEDULED_TASK filter maps to trigger_source=scheduled."""
|
||||
"""?type=SCHEDULED_TASK filter maps to trigger_source=scheduled for v9 compatibility."""
|
||||
PaperlessTaskFactory(trigger_source=PaperlessTask.TriggerSource.SCHEDULED)
|
||||
PaperlessTaskFactory(trigger_source=PaperlessTask.TriggerSource.WEB_UI)
|
||||
|
||||
@@ -351,6 +335,7 @@ class TestGetTasksV9:
|
||||
@pytest.mark.django_db()
|
||||
class TestAcknowledge:
|
||||
def test_returns_count(self, admin_client: APIClient) -> None:
|
||||
"""POST acknowledge/ returns the count of tasks that were acknowledged."""
|
||||
task1 = PaperlessTaskFactory()
|
||||
task2 = PaperlessTaskFactory()
|
||||
|
||||
@@ -367,6 +352,7 @@ class TestAcknowledge:
|
||||
self,
|
||||
admin_client: APIClient,
|
||||
) -> None:
|
||||
"""Acknowledged tasks no longer appear when filtering with ?acknowledged=false."""
|
||||
task = PaperlessTaskFactory()
|
||||
admin_client.post(
|
||||
ENDPOINT + "acknowledge/",
|
||||
@@ -380,6 +366,7 @@ class TestAcknowledge:
|
||||
assert len(response.data) == 0
|
||||
|
||||
def test_requires_change_permission(self, user_client: APIClient) -> None:
|
||||
"""Regular users without change_paperlesstask permission receive 403."""
|
||||
task = PaperlessTaskFactory()
|
||||
|
||||
response = user_client.post(
|
||||
@@ -391,6 +378,7 @@ class TestAcknowledge:
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
|
||||
def test_succeeds_with_change_permission(self, regular_user: User) -> None:
|
||||
"""Users granted change_paperlesstask permission can acknowledge tasks."""
|
||||
regular_user.user_permissions.add(
|
||||
Permission.objects.get(codename="change_paperlesstask"),
|
||||
)
|
||||
@@ -413,6 +401,7 @@ class TestAcknowledge:
|
||||
@pytest.mark.django_db()
|
||||
class TestAcknowledgeAll:
|
||||
def test_marks_only_completed_tasks(self, admin_client: APIClient) -> None:
|
||||
"""acknowledge_all/ marks only SUCCESS and FAILURE tasks as acknowledged."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.SUCCESS, acknowledged=False)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.FAILURE, acknowledged=False)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.PENDING, acknowledged=False)
|
||||
@@ -423,6 +412,7 @@ class TestAcknowledgeAll:
|
||||
assert response.data == {"result": 2}
|
||||
|
||||
def test_skips_already_acknowledged(self, admin_client: APIClient) -> None:
|
||||
"""acknowledge_all/ does not re-acknowledge tasks that are already acknowledged."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.SUCCESS, acknowledged=True)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.SUCCESS, acknowledged=False)
|
||||
|
||||
@@ -432,6 +422,7 @@ class TestAcknowledgeAll:
|
||||
assert response.data == {"result": 1}
|
||||
|
||||
def test_skips_pending_and_started(self, admin_client: APIClient) -> None:
|
||||
"""acknowledge_all/ does not touch PENDING or STARTED tasks."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.PENDING)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.STARTED)
|
||||
|
||||
@@ -441,6 +432,7 @@ class TestAcknowledgeAll:
|
||||
assert response.data == {"result": 0}
|
||||
|
||||
def test_includes_revoked(self, admin_client: APIClient) -> None:
|
||||
"""acknowledge_all/ marks REVOKED tasks as acknowledged."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.REVOKED, acknowledged=False)
|
||||
|
||||
response = admin_client.post(ENDPOINT + "acknowledge_all/")
|
||||
@@ -452,6 +444,7 @@ class TestAcknowledgeAll:
|
||||
@pytest.mark.django_db()
|
||||
class TestSummary:
|
||||
def test_returns_per_type_totals(self, admin_client: APIClient) -> None:
|
||||
"""summary/ returns per-type counts of total, success, and failure tasks."""
|
||||
PaperlessTaskFactory(
|
||||
task_type=PaperlessTask.TaskType.CONSUME_FILE,
|
||||
status=PaperlessTask.Status.SUCCESS,
|
||||
@@ -478,6 +471,7 @@ class TestSummary:
|
||||
@pytest.mark.django_db()
|
||||
class TestActive:
|
||||
def test_returns_pending_and_started_only(self, admin_client: APIClient) -> None:
|
||||
"""active/ returns only tasks in PENDING or STARTED status."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.PENDING)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.STARTED)
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.SUCCESS)
|
||||
@@ -494,6 +488,7 @@ class TestActive:
|
||||
}
|
||||
|
||||
def test_excludes_revoked_tasks_from_active(self, admin_client: APIClient) -> None:
|
||||
"""active/ excludes REVOKED tasks."""
|
||||
PaperlessTaskFactory(status=PaperlessTask.Status.REVOKED)
|
||||
|
||||
response = admin_client.get(ENDPOINT + "active/")
|
||||
@@ -505,6 +500,7 @@ class TestActive:
|
||||
@pytest.mark.django_db()
|
||||
class TestRun:
|
||||
def test_forbidden_for_regular_user(self, user_client: APIClient) -> None:
|
||||
"""Regular users without add_paperlesstask permission receive 403 from run/."""
|
||||
response = user_client.post(
|
||||
ENDPOINT + "run/",
|
||||
{"task_type": PaperlessTask.TaskType.TRAIN_CLASSIFIER},
|
||||
@@ -517,6 +513,7 @@ class TestRun:
|
||||
self,
|
||||
admin_client: APIClient,
|
||||
) -> None:
|
||||
"""run/ dispatches the task via apply_async with trigger_source=manual in headers."""
|
||||
fake_task_id = str(uuid.uuid4())
|
||||
mock_async_result = mock.Mock()
|
||||
mock_async_result.id = fake_task_id
|
||||
@@ -551,6 +548,7 @@ class TestRun:
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
|
||||
def test_returns_400_for_invalid_task_type(self, admin_client: APIClient) -> None:
|
||||
"""run/ returns 400 for an unrecognized task_type value."""
|
||||
response = admin_client.post(
|
||||
ENDPOINT + "run/",
|
||||
{"task_type": "not_a_real_type"},
|
||||
@@ -563,6 +561,7 @@ class TestRun:
|
||||
self,
|
||||
admin_client: APIClient,
|
||||
) -> None:
|
||||
"""run/ dispatches sanity_check with raise_on_error=False and manual trigger header."""
|
||||
fake_task_id = str(uuid.uuid4())
|
||||
mock_async_result = mock.Mock()
|
||||
mock_async_result.id = fake_task_id
|
||||
|
||||
Reference in New Issue
Block a user