paperless-ngx/src/documents/tests/test_api_workflows.py
2025-10-26 06:51:51 +00:00

1253 lines
46 KiB
Python

import json
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from rest_framework import status
from rest_framework.test import APITestCase
from documents.data_models import DocumentSource
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
from documents.models import Workflow
from documents.models import WorkflowAction
from documents.models import WorkflowTrigger
from documents.tests.utils import DirectoriesMixin
class TestApiWorkflows(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/workflows/"
ENDPOINT_TRIGGERS = "/api/workflow_triggers/"
ENDPOINT_ACTIONS = "/api/workflow_actions/"
def setUp(self) -> None:
super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
self.user2 = User.objects.create(username="user2")
self.user3 = User.objects.create(username="user3")
self.group1 = Group.objects.create(name="group1")
self.c = Correspondent.objects.create(name="Correspondent Name")
self.c2 = Correspondent.objects.create(name="Correspondent Name 2")
self.dt = DocumentType.objects.create(name="DocType Name")
self.dt2 = DocumentType.objects.create(name="DocType Name 2")
self.t1 = Tag.objects.create(name="t1")
self.t2 = Tag.objects.create(name="t2")
self.t3 = Tag.objects.create(name="t3")
self.sp = StoragePath.objects.create(name="Storage Path 1", path="/test/")
self.sp2 = StoragePath.objects.create(name="Storage Path 2", path="/test2/")
self.cf1 = CustomField.objects.create(name="Custom Field 1", data_type="string")
self.cf2 = CustomField.objects.create(
name="Custom Field 2",
data_type="integer",
)
self.trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
sources=f"{int(DocumentSource.ApiUpload)},{int(DocumentSource.ConsumeFolder)},{int(DocumentSource.MailFetch)}",
filter_filename="*simple*",
filter_path="*/samples/*",
)
self.action = WorkflowAction.objects.create(
assign_title="Doc from {correspondent}",
assign_correspondent=self.c,
assign_document_type=self.dt,
assign_storage_path=self.sp,
assign_owner=self.user2,
)
self.action.assign_tags.add(self.t1)
self.action.assign_tags.add(self.t2)
self.action.assign_tags.add(self.t3)
self.action.assign_view_users.add(self.user3.pk)
self.action.assign_view_groups.add(self.group1.pk)
self.action.assign_change_users.add(self.user3.pk)
self.action.assign_change_groups.add(self.group1.pk)
self.action.assign_custom_fields.add(self.cf1.pk)
self.action.assign_custom_fields.add(self.cf2.pk)
self.action.save()
self.workflow = Workflow.objects.create(
name="Workflow 1",
order=0,
)
self.workflow.triggers.add(self.trigger)
self.workflow.actions.add(self.action)
self.workflow.save()
def test_api_get_workflow(self):
"""
GIVEN:
- API request to get all workflows
WHEN:
- API is called
THEN:
- Existing workflows are returned
"""
response = self.client.get(self.ENDPOINT, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 1)
resp_workflow = response.data["results"][0]
self.assertEqual(resp_workflow["id"], self.workflow.id)
self.assertEqual(
resp_workflow["actions"][0]["assign_correspondent"],
self.action.assign_correspondent.pk,
)
def test_api_create_workflow(self):
"""
GIVEN:
- API request to create a workflow, trigger and action separately
WHEN:
- API is called
THEN:
- Correct HTTP response
- New workflow, trigger and action are created
"""
trigger_response = self.client.post(
self.ENDPOINT_TRIGGERS,
json.dumps(
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
),
content_type="application/json",
)
self.assertEqual(trigger_response.status_code, status.HTTP_201_CREATED)
action_response = self.client.post(
self.ENDPOINT_ACTIONS,
json.dumps(
{
"assign_title": "Action Title",
},
),
content_type="application/json",
)
self.assertEqual(action_response.status_code, status.HTTP_201_CREATED)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"id": trigger_response.data["id"],
"sources": [DocumentSource.ApiUpload],
"type": trigger_response.data["type"],
"filter_filename": trigger_response.data["filter_filename"],
},
],
"actions": [
{
"id": action_response.data["id"],
"assign_title": action_response.data["assign_title"],
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(Workflow.objects.count(), 2)
def test_api_create_workflow_nested(self):
"""
GIVEN:
- API request to create a workflow with nested trigger and action
WHEN:
- API is called
THEN:
- Correct HTTP response
- New workflow, trigger and action are created
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"sources": [DocumentSource.ApiUpload],
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"filter_filename": "*",
"filter_path": "*/samples/*",
"filter_has_tags": [self.t1.id],
"filter_has_all_tags": [self.t2.id],
"filter_has_not_tags": [self.t3.id],
"filter_has_not_correspondents": [self.c2.id],
"filter_has_not_document_types": [self.dt2.id],
"filter_has_not_storage_paths": [self.sp2.id],
"filter_custom_field_query": json.dumps(
[
"AND",
[[self.cf1.id, "exact", "value"]],
],
),
"filter_has_document_type": self.dt.id,
"filter_has_correspondent": self.c.id,
"filter_has_storage_path": self.sp.id,
},
],
"actions": [
{
"assign_title": "Action Title",
"assign_tags": [self.t2.id],
"assign_document_type": self.dt2.id,
"assign_correspondent": self.c2.id,
"assign_storage_path": self.sp2.id,
"assign_owner": self.user2.id,
"assign_view_users": [self.user2.id],
"assign_view_groups": [self.group1.id],
"assign_change_users": [self.user2.id],
"assign_change_groups": [self.group1.id],
"assign_custom_fields": [self.cf2.id],
},
{
"type": WorkflowAction.WorkflowActionType.REMOVAL,
"remove_tags": [self.t3.id],
"remove_document_types": [self.dt.id],
"remove_correspondents": [self.c.id],
"remove_storage_paths": [self.sp.id],
"remove_custom_fields": [self.cf1.id],
"remove_owners": [self.user2.id],
"remove_view_users": [self.user3.id],
"remove_change_users": [self.user3.id],
"remove_view_groups": [self.group1.id],
"remove_change_groups": [self.group1.id],
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(Workflow.objects.count(), 2)
workflow = Workflow.objects.get(name="Workflow 2")
trigger = workflow.triggers.first()
self.assertSetEqual(
set(trigger.filter_has_tags.values_list("id", flat=True)),
{self.t1.id},
)
self.assertSetEqual(
set(trigger.filter_has_all_tags.values_list("id", flat=True)),
{self.t2.id},
)
self.assertSetEqual(
set(trigger.filter_has_not_tags.values_list("id", flat=True)),
{self.t3.id},
)
self.assertSetEqual(
set(trigger.filter_has_not_correspondents.values_list("id", flat=True)),
{self.c2.id},
)
self.assertSetEqual(
set(trigger.filter_has_not_document_types.values_list("id", flat=True)),
{self.dt2.id},
)
self.assertSetEqual(
set(trigger.filter_has_not_storage_paths.values_list("id", flat=True)),
{self.sp2.id},
)
self.assertEqual(
trigger.filter_custom_field_query,
json.dumps(["AND", [[self.cf1.id, "exact", "value"]]]),
)
def test_api_create_invalid_workflow_trigger(self):
"""
GIVEN:
- API request to create a workflow trigger
- Neither type or file name nor path filter are specified
WHEN:
- API is called
THEN:
- Correct HTTP 400 response
- No objects are created
"""
response = self.client.post(
self.ENDPOINT_TRIGGERS,
json.dumps(
{
"sources": [DocumentSource.ApiUpload],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
response = self.client.post(
self.ENDPOINT_TRIGGERS,
json.dumps(
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(WorkflowTrigger.objects.count(), 1)
def test_api_create_invalid_assign_title(self):
"""
GIVEN:
- API request to create a workflow
- Invalid f-string for assign_title
WHEN:
- API is called
THEN:
- Correct HTTP 400 response
- No objects are created
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 1",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
},
],
"actions": [
{
"assign_title": "{created_year]",
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
"Invalid f-string detected",
response.data["actions"][0]["assign_title"][0],
)
self.assertEqual(Workflow.objects.count(), 1)
def test_api_create_workflow_trigger_action_empty_fields(self):
"""
GIVEN:
- API request to create a workflow trigger and action
- Path or filename filter or assign title are empty string
WHEN:
- API is called
THEN:
- Template is created but filter or title assignment is not set if ""
"""
response = self.client.post(
self.ENDPOINT_TRIGGERS,
json.dumps(
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*test*",
"filter_path": "",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
trigger = WorkflowTrigger.objects.get(id=response.data["id"])
self.assertEqual(trigger.filter_filename, "*test*")
self.assertIsNone(trigger.filter_path)
response = self.client.post(
self.ENDPOINT_ACTIONS,
json.dumps(
{
"assign_title": "",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
action = WorkflowAction.objects.get(id=response.data["id"])
self.assertIsNone(action.assign_title)
response = self.client.post(
self.ENDPOINT_TRIGGERS,
json.dumps(
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "",
"filter_path": "*/test/*",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
trigger2 = WorkflowTrigger.objects.get(id=response.data["id"])
self.assertEqual(trigger2.filter_path, "*/test/*")
self.assertIsNone(trigger2.filter_filename)
def test_api_update_workflow_nested_triggers_actions(self):
"""
GIVEN:
- Existing workflow with trigger and action
WHEN:
- API request to update an existing workflow with nested triggers actions
THEN:
- Triggers and actions are updated
"""
response = self.client.patch(
f"{self.ENDPOINT}{self.workflow.id}/",
json.dumps(
{
"name": "Workflow Updated",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
"filter_has_tags": [self.t1.id],
"filter_has_all_tags": [self.t2.id],
"filter_has_not_tags": [self.t3.id],
"filter_has_not_correspondents": [self.c2.id],
"filter_has_not_document_types": [self.dt2.id],
"filter_has_not_storage_paths": [self.sp2.id],
"filter_custom_field_query": json.dumps(
["AND", [[self.cf1.id, "exact", "value"]]],
),
"filter_has_correspondent": self.c.id,
"filter_has_document_type": self.dt.id,
},
],
"actions": [
{
"assign_title": "Action New Title",
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
workflow = Workflow.objects.get(id=response.data["id"])
self.assertEqual(workflow.name, "Workflow Updated")
self.assertEqual(workflow.triggers.first().filter_has_tags.first(), self.t1)
self.assertEqual(
workflow.triggers.first().filter_has_all_tags.first(),
self.t2,
)
self.assertEqual(
workflow.triggers.first().filter_has_not_tags.first(),
self.t3,
)
self.assertEqual(
workflow.triggers.first().filter_has_not_correspondents.first(),
self.c2,
)
self.assertEqual(
workflow.triggers.first().filter_has_not_document_types.first(),
self.dt2,
)
self.assertEqual(
workflow.triggers.first().filter_has_not_storage_paths.first(),
self.sp2,
)
self.assertEqual(
workflow.triggers.first().filter_custom_field_query,
json.dumps(["AND", [[self.cf1.id, "exact", "value"]]]),
)
self.assertEqual(workflow.actions.first().assign_title, "Action New Title")
def test_api_update_workflow_no_trigger_actions(self):
"""
GIVEN:
- Existing workflow
WHEN:
- API request to update an existing workflow with no triggers and actions
- API request to update an existing workflow with empty actions and no triggers
THEN:
- No changes are made to the workflow
- Actions are removed, but triggers are not
"""
response = self.client.patch(
f"{self.ENDPOINT}{self.workflow.id}/",
json.dumps(
{
"name": "Workflow Updated",
"order": 1,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
workflow = Workflow.objects.get(id=self.workflow.id)
self.assertEqual(workflow.name, "Workflow Updated")
self.assertEqual(workflow.triggers.count(), 1)
self.assertEqual(workflow.actions.count(), 1)
response = self.client.patch(
f"{self.ENDPOINT}{self.workflow.id}/",
json.dumps(
{
"name": "Workflow Updated 2",
"order": 1,
"actions": [],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
workflow = Workflow.objects.get(id=self.workflow.id)
self.assertEqual(workflow.name, "Workflow Updated 2")
self.assertEqual(workflow.triggers.count(), 1)
self.assertEqual(workflow.actions.count(), 0)
def test_api_auto_remove_orphaned_triggers_actions(self):
"""
GIVEN:
- Existing trigger and action
WHEN:
- API request is made which creates new trigger / actions
THEN:
- "Orphaned" triggers and actions are removed
"""
response = self.client.patch(
f"{self.ENDPOINT}{self.workflow.id}/",
json.dumps(
{
"name": "Workflow Updated",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
"filter_has_tags": [self.t1.id],
"filter_has_correspondent": self.c.id,
"filter_has_document_type": self.dt.id,
},
],
"actions": [
{
"assign_title": "Action New Title",
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
workflow = Workflow.objects.get(id=response.data["id"])
self.assertEqual(WorkflowTrigger.objects.all().count(), 1)
self.assertNotEqual(workflow.triggers.first().id, self.trigger.id)
self.assertEqual(WorkflowAction.objects.all().count(), 1)
self.assertNotEqual(workflow.actions.first().id, self.action.id)
def test_email_action_validation(self):
"""
GIVEN:
- API request to create a workflow with an email action
WHEN:
- API is called
THEN:
- Correct HTTP response
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.EMAIL,
},
],
},
),
content_type="application/json",
)
# Notification action requires to, subject and body
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.EMAIL,
"email": {
"subject": "Subject",
"body": "Body",
},
},
],
},
),
content_type="application/json",
)
# Notification action requires destination emails or url
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.EMAIL,
"email": {
"subject": "Subject",
"body": "Body",
"to": "me@example.com",
"include_document": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def test_webhook_action_validation(self):
"""
GIVEN:
- API request to create a workflow with a notification action
WHEN:
- API is called
THEN:
- Correct HTTP response
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.WEBHOOK,
},
],
},
),
content_type="application/json",
)
# Notification action requires url
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.WEBHOOK,
"webhook": {
"url": "https://example.com",
"include_document": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def test_webhook_action_url_validation(self):
"""
GIVEN:
- API request to create a workflow with a notification action
WHEN:
- API is called
THEN:
- Correct HTTP response
"""
for url, expected_resp_code in [
("https://examplewithouttld:3000/path", status.HTTP_201_CREATED),
("file:///etc/passwd/path", status.HTTP_400_BAD_REQUEST),
]:
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.WEBHOOK,
"webhook": {
"url": url,
"include_document": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, expected_resp_code)
def test_patch_trigger_cannot_change_id(self):
"""
GIVEN:
- An existing workflow trigger
- An existing workflow action
WHEN:
- PATCHing the trigger with a different 'id' in the body
- PATCHing the action with a different 'id' in the body
THEN:
- HTTP 400 error is returned
"""
response = self.client.patch(
f"/api/workflow_triggers/{self.trigger.id}/",
{
"id": self.trigger.id + 1,
"filter_filename": "patched.pdf",
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.trigger.refresh_from_db()
self.assertNotEqual(self.trigger.filter_filename, "patched.pdf")
response = self.client.patch(
f"/api/workflow_triggers/{self.trigger.id}/",
{
"id": self.trigger.id,
"filter_filename": "patched.pdf",
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.trigger.refresh_from_db()
self.assertEqual(self.trigger.filter_filename, "patched.pdf")
response = self.client.patch(
f"/api/workflow_actions/{self.action.id}/",
{
"id": self.action.id + 1,
"assign_title": "Patched Title",
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.action.refresh_from_db()
self.assertNotEqual(self.action.assign_title, "Patched Title")
response = self.client.patch(
f"/api/workflow_actions/{self.action.id}/",
{
"id": self.action.id,
"assign_title": "Patched Title",
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.action.refresh_from_db()
self.assertEqual(self.action.assign_title, "Patched Title")
def test_deletion_action_validation(self):
"""
GIVEN:
- API request to create a workflow with a deletion action
WHEN:
- API is called
THEN:
- Correct HTTP response
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 2",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow 3",
"order": 2,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": True,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def test_deletion_action_as_last_action_valid(self):
"""
GIVEN:
- API request to create a workflow with multiple actions
- Deletion action is the last action
WHEN:
- API is called
THEN:
- Workflow is created successfully
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow with Deletion Last",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "Assigned Title",
},
{
"type": WorkflowAction.WorkflowActionType.REMOVAL,
"remove_all_tags": True,
},
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def test_deletion_action_in_middle_invalid(self):
"""
GIVEN:
- API request to create a workflow with deletion action not at the end
WHEN:
- API is called
THEN:
- HTTP 400 error with validation message
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow with Deletion in Middle",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "After Deletion",
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
"Delete action must be the last action in the workflow",
str(response.data),
)
def test_multiple_deletion_actions_invalid(self):
"""
GIVEN:
- API request to create a workflow with multiple deletion actions
WHEN:
- API is called
THEN:
- HTTP 400 error with validation message
- Multiple deletions are caught because the first one is not last
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow with Multiple Deletions",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": True,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
"Delete action must be the last action in the workflow",
str(response.data),
)
def test_update_workflow_add_action_after_deletion_invalid(self):
"""
GIVEN:
- Existing workflow with deletion action at end
WHEN:
- PATCH to add action after deletion
THEN:
- HTTP 400 error with validation message
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow to Update",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
workflow_id = response.data["id"]
response = self.client.patch(
f"{self.ENDPOINT}{workflow_id}/",
json.dumps(
{
"actions": [
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "After Deletion",
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
"Delete action must be the last action in the workflow",
str(response.data),
)
def test_update_workflow_reorder_deletion_to_middle_invalid(self):
"""
GIVEN:
- Existing workflow with assignment then deletion
WHEN:
- PATCH to reorder to deletion then assignment
THEN:
- HTTP 400 error with validation message
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow to Reorder",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "First",
},
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
workflow_id = response.data["id"]
response = self.client.patch(
f"{self.ENDPOINT}{workflow_id}/",
json.dumps(
{
"actions": [
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "Second",
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
"Delete action must be the last action in the workflow",
str(response.data),
)
def test_update_workflow_add_deletion_at_end_valid(self):
"""
GIVEN:
- Existing workflow without deletion action
WHEN:
- PATCH to add deletion action at end
THEN:
- HTTP 200 success
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow to Add Deletion",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "First Action",
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
workflow_id = response.data["id"]
response = self.client.patch(
f"{self.ENDPOINT}{workflow_id}/",
json.dumps(
{
"actions": [
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "First Action",
},
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_update_workflow_remove_deletion_action_valid(self):
"""
GIVEN:
- Existing workflow with deletion action
WHEN:
- PATCH to remove deletion action
THEN:
- HTTP 200 success
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Workflow to Remove Deletion",
"order": 1,
"triggers": [
{
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*",
},
],
"actions": [
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "First Action",
},
{
"type": WorkflowAction.WorkflowActionType.DELETION,
"deletion": {
"skip_trash": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
workflow_id = response.data["id"]
response = self.client.patch(
f"{self.ENDPOINT}{workflow_id}/",
json.dumps(
{
"actions": [
{
"type": WorkflowAction.WorkflowActionType.ASSIGNMENT,
"assign_title": "Only Action",
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)