diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index e410b54e2..6cafbc1f8 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -1,14 +1,10 @@ from __future__ import annotations -import ipaddress import logging import shutil -import socket from pathlib import Path from typing import TYPE_CHECKING -from urllib.parse import urlparse -import httpx from celery import shared_task from celery import states from celery.signals import before_task_publish @@ -27,20 +23,15 @@ from django.db.models import Q from django.dispatch import receiver from django.utils import timezone from filelock import FileLock -from guardian.shortcuts import remove_perm from documents import matching from documents.caching import clear_document_caches from documents.file_handling import create_source_path_directory from documents.file_handling import delete_empty_directories from documents.file_handling import generate_unique_filename -from documents.mail import EmailAttachment -from documents.mail import send_email -from documents.models import Correspondent from documents.models import CustomField from documents.models import CustomFieldInstance from documents.models import Document -from documents.models import DocumentType from documents.models import MatchingModel from documents.models import PaperlessTask from documents.models import SavedView @@ -51,8 +42,14 @@ from documents.models import WorkflowAction from documents.models import WorkflowRun from documents.models import WorkflowTrigger from documents.permissions import get_objects_for_user_owner_aware -from documents.permissions import set_permissions_for_object -from documents.templating.workflows import parse_w_workflow_placeholders +from documents.workflows.actions import build_workflow_action_context +from documents.workflows.actions import execute_email_action +from documents.workflows.actions import execute_webhook_action +from documents.workflows.mutations import apply_assignment_to_document +from documents.workflows.mutations import apply_assignment_to_overrides +from documents.workflows.mutations import apply_removal_to_document +from documents.workflows.mutations import apply_removal_to_overrides +from documents.workflows.utils import get_workflows_for_trigger if TYPE_CHECKING: from documents.classifier import DocumentClassifier @@ -673,92 +670,6 @@ def run_workflows_updated(sender, document: Document, logging_group=None, **kwar ) -def _is_public_ip(ip: str) -> bool: - try: - obj = ipaddress.ip_address(ip) - return not ( - obj.is_private - or obj.is_loopback - or obj.is_link_local - or obj.is_multicast - or obj.is_unspecified - ) - except ValueError: # pragma: no cover - return False - - -def _resolve_first_ip(host: str) -> str | None: - try: - info = socket.getaddrinfo(host, None) - return info[0][4][0] if info else None - except Exception: # pragma: no cover - return None - - -@shared_task( - retry_backoff=True, - autoretry_for=(httpx.HTTPStatusError,), - max_retries=3, - throws=(httpx.HTTPError,), -) -def send_webhook( - url: str, - data: str | dict, - headers: dict, - files: dict, - *, - as_json: bool = False, -): - p = urlparse(url) - if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname: - logger.warning("Webhook blocked: invalid scheme/hostname") - raise ValueError("Invalid URL scheme or hostname.") - - port = p.port or (443 if p.scheme == "https" else 80) - if ( - len(settings.WEBHOOKS_ALLOWED_PORTS) > 0 - and port not in settings.WEBHOOKS_ALLOWED_PORTS - ): - logger.warning("Webhook blocked: port not permitted") - raise ValueError("Destination port not permitted.") - - ip = _resolve_first_ip(p.hostname) - if not ip or ( - not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS - ): - logger.warning("Webhook blocked: destination not allowed") - raise ValueError("Destination host is not allowed.") - - try: - post_args = { - "url": url, - "headers": { - k: v for k, v in (headers or {}).items() if k.lower() != "host" - }, - "files": files or None, - "timeout": 5.0, - "follow_redirects": False, - } - if as_json: - post_args["json"] = data - elif isinstance(data, dict): - post_args["data"] = data - else: - post_args["content"] = data - - httpx.post( - **post_args, - ).raise_for_status() - logger.info( - f"Webhook sent to {url}", - ) - except Exception as e: - logger.error( - f"Failed attempt sending webhook to {url}: {e}", - ) - raise e - - def run_workflows( trigger_type: WorkflowTrigger.WorkflowTriggerType, document: Document | ConsumableDocument, @@ -767,572 +678,16 @@ def run_workflows( overrides: DocumentMetadataOverrides | None = None, original_file: Path | None = None, ) -> tuple[DocumentMetadataOverrides, str] | None: - """Run workflows which match a Document (or ConsumableDocument) for a specific trigger type or a single workflow if given. - - Assignment or removal actions are either applied directly to the document or an overrides object. If an overrides - object is provided, the function returns the object with the applied changes or None if no actions were applied and a string - of messages for each action. If no overrides object is provided, the changes are applied directly to the document and the - function returns None. """ + Execute workflows matching a document for the given trigger. When `overrides` is provided + (consumption flow), actions mutate that object and the function returns `(overrides, messages)`. + Otherwise actions mutate the actual document and return nothing. - def assignment_action(): - if action.assign_tags.exists(): - tag_ids_to_add: set[int] = set() - for tag in action.assign_tags.all(): - tag_ids_to_add.add(tag.pk) - tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks()) + Attachments for email/webhook actions use `original_file` when given, otherwise fall back to + `document.source_path` (Document) or `document.original_file` (ConsumableDocument). - if not use_overrides: - doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add) - else: - if overrides.tag_ids is None: - overrides.tag_ids = [] - overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add) - - if action.assign_correspondent: - if not use_overrides: - document.correspondent = action.assign_correspondent - else: - overrides.correspondent_id = action.assign_correspondent.pk - - if action.assign_document_type: - if not use_overrides: - document.document_type = action.assign_document_type - else: - overrides.document_type_id = action.assign_document_type.pk - - if action.assign_storage_path: - if not use_overrides: - document.storage_path = action.assign_storage_path - else: - overrides.storage_path_id = action.assign_storage_path.pk - - if action.assign_owner: - if not use_overrides: - document.owner = action.assign_owner - else: - overrides.owner_id = action.assign_owner.pk - - if action.assign_title: - if not use_overrides: - try: - document.title = parse_w_workflow_placeholders( - action.assign_title, - document.correspondent.name if document.correspondent else "", - document.document_type.name if document.document_type else "", - document.owner.username if document.owner else "", - timezone.localtime(document.added), - document.original_filename or "", - document.filename or "", - document.created, - ) - except Exception: - logger.exception( - f"Error occurred parsing title assignment '{action.assign_title}', falling back to original", - extra={"group": logging_group}, - ) - else: - overrides.title = action.assign_title - - if any( - [ - action.assign_view_users.exists(), - action.assign_view_groups.exists(), - action.assign_change_users.exists(), - action.assign_change_groups.exists(), - ], - ): - permissions = { - "view": { - "users": action.assign_view_users.values_list("id", flat=True), - "groups": action.assign_view_groups.values_list("id", flat=True), - }, - "change": { - "users": action.assign_change_users.values_list("id", flat=True), - "groups": action.assign_change_groups.values_list("id", flat=True), - }, - } - if not use_overrides: - set_permissions_for_object( - permissions=permissions, - object=document, - merge=True, - ) - else: - overrides.view_users = list( - set( - (overrides.view_users or []) - + list(permissions["view"]["users"]), - ), - ) - overrides.view_groups = list( - set( - (overrides.view_groups or []) - + list(permissions["view"]["groups"]), - ), - ) - overrides.change_users = list( - set( - (overrides.change_users or []) - + list(permissions["change"]["users"]), - ), - ) - overrides.change_groups = list( - set( - (overrides.change_groups or []) - + list(permissions["change"]["groups"]), - ), - ) - - if action.assign_custom_fields.exists(): - if not use_overrides: - for field in action.assign_custom_fields.all(): - value_field_name = CustomFieldInstance.get_value_field_name( - data_type=field.data_type, - ) - args = { - value_field_name: action.assign_custom_fields_values.get( - str(field.pk), - None, - ), - } - # for some reason update_or_create doesn't work here - instance = CustomFieldInstance.objects.filter( - field=field, - document=document, - ).first() - if instance and args[value_field_name] is not None: - setattr(instance, value_field_name, args[value_field_name]) - instance.save() - elif not instance: - CustomFieldInstance.objects.create( - **args, - field=field, - document=document, - ) - else: - if overrides.custom_fields is None: - overrides.custom_fields = {} - overrides.custom_fields.update( - { - field.pk: action.assign_custom_fields_values.get( - str(field.pk), - None, - ) - for field in action.assign_custom_fields.all() - }, - ) - - def removal_action(): - if action.remove_all_tags: - if not use_overrides: - doc_tag_ids.clear() - else: - overrides.tag_ids = None - else: - tag_ids_to_remove: set[int] = set() - for tag in action.remove_tags.all(): - tag_ids_to_remove.add(tag.pk) - tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks()) - - if not use_overrides: - doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove] - elif overrides.tag_ids: - overrides.tag_ids = [ - t for t in overrides.tag_ids if t not in tag_ids_to_remove - ] - - if not use_overrides and ( - action.remove_all_correspondents - or ( - document.correspondent - and action.remove_correspondents.filter( - pk=document.correspondent.pk, - ).exists() - ) - ): - document.correspondent = None - elif use_overrides and ( - action.remove_all_correspondents - or ( - overrides.correspondent_id - and action.remove_correspondents.filter( - pk=overrides.correspondent_id, - ).exists() - ) - ): - overrides.correspondent_id = None - - if not use_overrides and ( - action.remove_all_document_types - or ( - document.document_type - and action.remove_document_types.filter( - pk=document.document_type.pk, - ).exists() - ) - ): - document.document_type = None - elif use_overrides and ( - action.remove_all_document_types - or ( - overrides.document_type_id - and action.remove_document_types.filter( - pk=overrides.document_type_id, - ).exists() - ) - ): - overrides.document_type_id = None - - if not use_overrides and ( - action.remove_all_storage_paths - or ( - document.storage_path - and action.remove_storage_paths.filter( - pk=document.storage_path.pk, - ).exists() - ) - ): - document.storage_path = None - elif use_overrides and ( - action.remove_all_storage_paths - or ( - overrides.storage_path_id - and action.remove_storage_paths.filter( - pk=overrides.storage_path_id, - ).exists() - ) - ): - overrides.storage_path_id = None - - if not use_overrides and ( - action.remove_all_owners - or ( - document.owner - and action.remove_owners.filter(pk=document.owner.pk).exists() - ) - ): - document.owner = None - elif use_overrides and ( - action.remove_all_owners - or ( - overrides.owner_id - and action.remove_owners.filter(pk=overrides.owner_id).exists() - ) - ): - overrides.owner_id = None - - if action.remove_all_permissions: - if not use_overrides: - permissions = { - "view": {"users": [], "groups": []}, - "change": {"users": [], "groups": []}, - } - set_permissions_for_object( - permissions=permissions, - object=document, - merge=False, - ) - else: - overrides.view_users = None - overrides.view_groups = None - overrides.change_users = None - overrides.change_groups = None - elif any( - [ - action.remove_view_users.exists(), - action.remove_view_groups.exists(), - action.remove_change_users.exists(), - action.remove_change_groups.exists(), - ], - ): - if not use_overrides: - for user in action.remove_view_users.all(): - remove_perm("view_document", user, document) - for user in action.remove_change_users.all(): - remove_perm("change_document", user, document) - for group in action.remove_view_groups.all(): - remove_perm("view_document", group, document) - for group in action.remove_change_groups.all(): - remove_perm("change_document", group, document) - else: - if overrides.view_users: - for user in action.remove_view_users.filter( - pk__in=overrides.view_users, - ): - overrides.view_users.remove(user.pk) - if overrides.change_users: - for user in action.remove_change_users.filter( - pk__in=overrides.change_users, - ): - overrides.change_users.remove(user.pk) - if overrides.view_groups: - for group in action.remove_view_groups.filter( - pk__in=overrides.view_groups, - ): - overrides.view_groups.remove(group.pk) - if overrides.change_groups: - for group in action.remove_change_groups.filter( - pk__in=overrides.change_groups, - ): - overrides.change_groups.remove(group.pk) - - if action.remove_all_custom_fields: - if not use_overrides: - CustomFieldInstance.objects.filter(document=document).hard_delete() - else: - overrides.custom_fields = None - elif action.remove_custom_fields.exists(): - if not use_overrides: - CustomFieldInstance.objects.filter( - field__in=action.remove_custom_fields.all(), - document=document, - ).hard_delete() - elif overrides.custom_fields: - for field in action.remove_custom_fields.filter( - pk__in=overrides.custom_fields.keys(), - ): - overrides.custom_fields.pop(field.pk, None) - - def email_action(): - if not settings.EMAIL_ENABLED: - logger.error( - "Email backend has not been configured, cannot send email notifications", - extra={"group": logging_group}, - ) - return - - if not use_overrides: - title = document.title - doc_url = ( - f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/" - ) - correspondent = ( - document.correspondent.name if document.correspondent else "" - ) - document_type = ( - document.document_type.name if document.document_type else "" - ) - owner_username = document.owner.username if document.owner else "" - filename = document.original_filename or "" - current_filename = document.filename or "" - added = timezone.localtime(document.added) - created = document.created - else: - title = overrides.title if overrides.title else str(document.original_file) - doc_url = "" - correspondent = ( - Correspondent.objects.filter(pk=overrides.correspondent_id).first() - if overrides.correspondent_id - else "" - ) - document_type = ( - DocumentType.objects.filter(pk=overrides.document_type_id).first().name - if overrides.document_type_id - else "" - ) - owner_username = ( - User.objects.filter(pk=overrides.owner_id).first().username - if overrides.owner_id - else "" - ) - filename = document.original_file if document.original_file else "" - current_filename = filename - added = timezone.localtime(timezone.now()) - created = overrides.created - - subject = ( - parse_w_workflow_placeholders( - action.email.subject, - correspondent, - document_type, - owner_username, - added, - filename, - current_filename, - created, - title, - doc_url, - ) - if action.email.subject - else "" - ) - body = ( - parse_w_workflow_placeholders( - action.email.body, - correspondent, - document_type, - owner_username, - added, - filename, - current_filename, - created, - title, - doc_url, - ) - if action.email.body - else "" - ) - try: - attachments: list[EmailAttachment] = [] - if action.email.include_document: - attachment: EmailAttachment | None = None - if trigger_type in [ - WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, - WorkflowTrigger.WorkflowTriggerType.SCHEDULED, - ] and isinstance(document, Document): - friendly_name = ( - Path(current_filename).name - if current_filename - else document.source_path.name - ) - attachment = EmailAttachment( - path=document.source_path, - mime_type=document.mime_type, - friendly_name=friendly_name, - ) - elif original_file: - friendly_name = ( - Path(current_filename).name - if current_filename - else original_file.name - ) - attachment = EmailAttachment( - path=original_file, - mime_type=document.mime_type, - friendly_name=friendly_name, - ) - if attachment: - attachments = [attachment] - n_messages = send_email( - subject=subject, - body=body, - to=action.email.to.split(","), - attachments=attachments, - ) - logger.debug( - f"Sent {n_messages} notification email(s) to {action.email.to}", - extra={"group": logging_group}, - ) - except Exception as e: - logger.exception( - f"Error occurred sending notification email: {e}", - extra={"group": logging_group}, - ) - - def webhook_action(): - if not use_overrides: - title = document.title - doc_url = ( - f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/" - ) - correspondent = ( - document.correspondent.name if document.correspondent else "" - ) - document_type = ( - document.document_type.name if document.document_type else "" - ) - owner_username = document.owner.username if document.owner else "" - filename = document.original_filename or "" - current_filename = document.filename or "" - added = timezone.localtime(document.added) - created = document.created - else: - title = overrides.title if overrides.title else str(document.original_file) - doc_url = "" - correspondent = ( - Correspondent.objects.filter(pk=overrides.correspondent_id).first() - if overrides.correspondent_id - else "" - ) - document_type = ( - DocumentType.objects.filter(pk=overrides.document_type_id).first().name - if overrides.document_type_id - else "" - ) - owner_username = ( - User.objects.filter(pk=overrides.owner_id).first().username - if overrides.owner_id - else "" - ) - filename = document.original_file if document.original_file else "" - current_filename = filename - added = timezone.localtime(timezone.now()) - created = overrides.created - - try: - data = {} - if action.webhook.use_params: - if action.webhook.params: - try: - for key, value in action.webhook.params.items(): - data[key] = parse_w_workflow_placeholders( - value, - correspondent, - document_type, - owner_username, - added, - filename, - current_filename, - created, - title, - doc_url, - ) - except Exception as e: - logger.error( - f"Error occurred parsing webhook params: {e}", - extra={"group": logging_group}, - ) - elif action.webhook.body: - data = parse_w_workflow_placeholders( - action.webhook.body, - correspondent, - document_type, - owner_username, - added, - filename, - current_filename, - created, - title, - doc_url, - ) - headers = {} - if action.webhook.headers: - try: - headers = { - str(k): str(v) for k, v in action.webhook.headers.items() - } - except Exception as e: - logger.error( - f"Error occurred parsing webhook headers: {e}", - extra={"group": logging_group}, - ) - files = None - if action.webhook.include_document: - with original_file.open("rb") as f: - files = { - "file": ( - filename, - f.read(), - document.mime_type, - ), - } - send_webhook.delay( - url=action.webhook.url, - data=data, - headers=headers, - files=files, - as_json=action.webhook.as_json, - ) - logger.debug( - f"Webhook to {action.webhook.url} queued", - extra={"group": logging_group}, - ) - except Exception as e: - logger.exception( - f"Error occurred sending webhook: {e}", - extra={"group": logging_group}, - ) + Passing `workflow_to_run` skips the workflow query (currently only used by scheduled runs). + """ use_overrides = overrides is not None if original_file is None: @@ -1341,30 +696,7 @@ def run_workflows( ) messages = [] - workflows = ( - ( - Workflow.objects.filter(enabled=True, triggers__type=trigger_type) - .prefetch_related( - "actions", - "actions__assign_view_users", - "actions__assign_view_groups", - "actions__assign_change_users", - "actions__assign_change_groups", - "actions__assign_custom_fields", - "actions__remove_tags", - "actions__remove_correspondents", - "actions__remove_document_types", - "actions__remove_storage_paths", - "actions__remove_custom_fields", - "actions__remove_owners", - "triggers", - ) - .order_by("order") - .distinct() - ) - if workflow_to_run is None - else [workflow_to_run] - ) + workflows = get_workflows_for_trigger(trigger_type, workflow_to_run) for workflow in workflows: if not use_overrides: @@ -1384,13 +716,39 @@ def run_workflows( messages.append(message) if action.type == WorkflowAction.WorkflowActionType.ASSIGNMENT: - assignment_action() + if use_overrides and overrides: + apply_assignment_to_overrides(action, overrides) + else: + apply_assignment_to_document( + action, + document, + doc_tag_ids, + logging_group, + ) elif action.type == WorkflowAction.WorkflowActionType.REMOVAL: - removal_action() + if use_overrides and overrides: + apply_removal_to_overrides(action, overrides) + else: + apply_removal_to_document(action, document, doc_tag_ids) elif action.type == WorkflowAction.WorkflowActionType.EMAIL: - email_action() + context = build_workflow_action_context(document, overrides) + execute_email_action( + action, + document, + context, + logging_group, + original_file, + trigger_type, + ) elif action.type == WorkflowAction.WorkflowActionType.WEBHOOK: - webhook_action() + context = build_workflow_action_context(document, overrides) + execute_webhook_action( + action, + document, + context, + logging_group, + original_file, + ) if not use_overrides: # limit title to 128 characters diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 17bfce3b0..606f278db 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -41,7 +41,6 @@ from documents.models import DocumentType from documents.models import PaperlessTask from documents.models import StoragePath from documents.models import Tag -from documents.models import Workflow from documents.models import WorkflowRun from documents.models import WorkflowTrigger from documents.parsers import DocumentParser @@ -54,6 +53,7 @@ from documents.sanity_checker import SanityCheckFailedException from documents.signals import document_updated from documents.signals.handlers import cleanup_document_deletion from documents.signals.handlers import run_workflows +from documents.workflows.utils import get_workflows_for_trigger if settings.AUDIT_LOG_ENABLED: from auditlog.models import LogEntry @@ -400,13 +400,8 @@ def check_scheduled_workflows(): Once a document satisfies this condition, and recurring/non-recurring constraints are met, the workflow is run. """ - scheduled_workflows: list[Workflow] = ( - Workflow.objects.filter( - triggers__type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED, - enabled=True, - ) - .distinct() - .prefetch_related("triggers") + scheduled_workflows = get_workflows_for_trigger( + WorkflowTrigger.WorkflowTriggerType.SCHEDULED, ) if scheduled_workflows.count() > 0: logger.debug(f"Checking {len(scheduled_workflows)} scheduled workflows") diff --git a/src/documents/tests/test_workflows.py b/src/documents/tests/test_workflows.py index 6438e4b10..e606bc5a0 100644 --- a/src/documents/tests/test_workflows.py +++ b/src/documents/tests/test_workflows.py @@ -26,7 +26,7 @@ from rest_framework.test import APITestCase from documents.file_handling import create_source_path_directory from documents.file_handling import generate_unique_filename from documents.signals.handlers import run_workflows -from documents.signals.handlers import send_webhook +from documents.workflows.webhooks import send_webhook if TYPE_CHECKING: from django.db.models import QuerySet @@ -2858,7 +2858,7 @@ class TestWorkflows( mock_email_send.return_value = 1 - with self.assertNoLogs("paperless.handlers", level="ERROR"): + with self.assertNoLogs("paperless.workflows", level="ERROR"): run_workflows( WorkflowTrigger.WorkflowTriggerType.CONSUMPTION, consumable_document, @@ -3096,7 +3096,7 @@ class TestWorkflows( original_filename="sample.pdf", ) - with self.assertLogs("paperless.handlers", level="ERROR") as cm: + with self.assertLogs("paperless.workflows.actions", level="ERROR") as cm: run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc) expected_str = "Email backend has not been configured" @@ -3144,7 +3144,7 @@ class TestWorkflows( original_filename="sample.pdf", ) - with self.assertLogs("paperless.handlers", level="ERROR") as cm: + with self.assertLogs("paperless.workflows", level="ERROR") as cm: run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc) expected_str = "Error occurred sending email" @@ -3215,7 +3215,7 @@ class TestWorkflows( PAPERLESS_FORCE_SCRIPT_NAME="/paperless", BASE_URL="/paperless/", ) - @mock.patch("documents.signals.handlers.send_webhook.delay") + @mock.patch("documents.workflows.webhooks.send_webhook.delay") def test_workflow_webhook_action_body(self, mock_post): """ GIVEN: @@ -3274,7 +3274,7 @@ class TestWorkflows( @override_settings( PAPERLESS_URL="http://localhost:8000", ) - @mock.patch("documents.signals.handlers.send_webhook.delay") + @mock.patch("documents.workflows.webhooks.send_webhook.delay") def test_workflow_webhook_action_w_files(self, mock_post): """ GIVEN: @@ -3377,7 +3377,7 @@ class TestWorkflows( ) # fails because no file - with self.assertLogs("paperless.handlers", level="ERROR") as cm: + with self.assertLogs("paperless.workflows", level="ERROR") as cm: run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc) expected_str = "Error occurred sending webhook" @@ -3420,7 +3420,7 @@ class TestWorkflows( original_filename="sample.pdf", ) - with self.assertLogs("paperless.handlers", level="ERROR") as cm: + with self.assertLogs("paperless.workflows", level="ERROR") as cm: run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc) expected_str = "Error occurred parsing webhook params" @@ -3436,7 +3436,7 @@ class TestWorkflows( raise_for_status=mock.Mock(), ) - with self.assertLogs("paperless.handlers") as cm: + with self.assertLogs("paperless.workflows") as cm: send_webhook( url="http://paperless-ngx.com", data="Test message", @@ -3482,7 +3482,7 @@ class TestWorkflows( ), ) - with self.assertLogs("paperless.handlers") as cm: + with self.assertLogs("paperless.workflows") as cm: with self.assertRaises(HTTPStatusError): send_webhook( url="http://paperless-ngx.com", @@ -3498,7 +3498,7 @@ class TestWorkflows( ) self.assertIn(expected_str, cm.output[0]) - @mock.patch("documents.signals.handlers.send_webhook.delay") + @mock.patch("documents.workflows.webhooks.send_webhook.delay") def test_workflow_webhook_action_consumption(self, mock_post): """ GIVEN: diff --git a/src/documents/workflows/__init__.py b/src/documents/workflows/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/documents/workflows/actions.py b/src/documents/workflows/actions.py new file mode 100644 index 000000000..040cbc127 --- /dev/null +++ b/src/documents/workflows/actions.py @@ -0,0 +1,261 @@ +import logging +from pathlib import Path + +from django.conf import settings +from django.contrib.auth.models import User +from django.utils import timezone + +from documents.data_models import ConsumableDocument +from documents.data_models import DocumentMetadataOverrides +from documents.mail import EmailAttachment +from documents.mail import send_email +from documents.models import Correspondent +from documents.models import Document +from documents.models import DocumentType +from documents.models import WorkflowAction +from documents.models import WorkflowTrigger +from documents.templating.workflows import parse_w_workflow_placeholders +from documents.workflows.webhooks import send_webhook + +logger = logging.getLogger("paperless.workflows.actions") + + +def build_workflow_action_context( + document: Document | ConsumableDocument, + overrides: DocumentMetadataOverrides | None, +) -> dict: + """ + Build context dictionary for workflow action placeholder parsing. + """ + use_overrides = overrides is not None + + if not use_overrides: + return { + "title": document.title, + "doc_url": f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/", + "correspondent": document.correspondent.name + if document.correspondent + else "", + "document_type": document.document_type.name + if document.document_type + else "", + "owner_username": document.owner.username if document.owner else "", + "filename": document.original_filename or "", + "current_filename": document.filename or "", + "added": timezone.localtime(document.added), + "created": document.created, + } + + correspondent_obj = ( + Correspondent.objects.filter(pk=overrides.correspondent_id).first() + if overrides and overrides.correspondent_id + else None + ) + document_type_obj = ( + DocumentType.objects.filter(pk=overrides.document_type_id).first() + if overrides and overrides.document_type_id + else None + ) + owner_obj = ( + User.objects.filter(pk=overrides.owner_id).first() + if overrides and overrides.owner_id + else None + ) + + filename = document.original_file if document.original_file else "" + return { + "title": overrides.title + if overrides and overrides.title + else str(document.original_file), + "doc_url": "", + "correspondent": correspondent_obj.name if correspondent_obj else "", + "document_type": document_type_obj.name if document_type_obj else "", + "owner_username": owner_obj.username if owner_obj else "", + "filename": filename, + "current_filename": filename, + "added": timezone.localtime(timezone.now()), + "created": overrides.created if overrides else None, + } + + +def execute_email_action( + action: WorkflowAction, + document: Document | ConsumableDocument, + context: dict, + logging_group, + original_file: Path, + trigger_type: WorkflowTrigger.WorkflowTriggerType, +) -> None: + """ + Execute an email action for a workflow. + """ + + if not settings.EMAIL_ENABLED: + logger.error( + "Email backend has not been configured, cannot send email notifications", + extra={"group": logging_group}, + ) + return + + subject = ( + parse_w_workflow_placeholders( + action.email.subject, + context["correspondent"], + context["document_type"], + context["owner_username"], + context["added"], + context["filename"], + context["current_filename"], + context["created"], + context["title"], + context["doc_url"], + ) + if action.email.subject + else "" + ) + body = ( + parse_w_workflow_placeholders( + action.email.body, + context["correspondent"], + context["document_type"], + context["owner_username"], + context["added"], + context["filename"], + context["current_filename"], + context["created"], + context["title"], + context["doc_url"], + ) + if action.email.body + else "" + ) + + try: + attachments: list[EmailAttachment] = [] + if action.email.include_document: + attachment: EmailAttachment | None = None + if trigger_type in [ + WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, + WorkflowTrigger.WorkflowTriggerType.SCHEDULED, + ] and isinstance(document, Document): + friendly_name = ( + Path(context["current_filename"]).name + if context["current_filename"] + else document.source_path.name + ) + attachment = EmailAttachment( + path=document.source_path, + mime_type=document.mime_type, + friendly_name=friendly_name, + ) + elif original_file: + friendly_name = ( + Path(context["current_filename"]).name + if context["current_filename"] + else original_file.name + ) + attachment = EmailAttachment( + path=original_file, + mime_type=document.mime_type, + friendly_name=friendly_name, + ) + if attachment: + attachments = [attachment] + + n_messages = send_email( + subject=subject, + body=body, + to=action.email.to.split(","), + attachments=attachments, + ) + logger.debug( + f"Sent {n_messages} notification email(s) to {action.email.to}", + extra={"group": logging_group}, + ) + except Exception as e: + logger.exception( + f"Error occurred sending notification email: {e}", + extra={"group": logging_group}, + ) + + +def execute_webhook_action( + action: WorkflowAction, + document: Document | ConsumableDocument, + context: dict, + logging_group, + original_file: Path, +): + try: + data = {} + if action.webhook.use_params: + if action.webhook.params: + try: + for key, value in action.webhook.params.items(): + data[key] = parse_w_workflow_placeholders( + value, + context["correspondent"], + context["document_type"], + context["owner_username"], + context["added"], + context["filename"], + context["current_filename"], + context["created"], + context["title"], + context["doc_url"], + ) + except Exception as e: + logger.error( + f"Error occurred parsing webhook params: {e}", + extra={"group": logging_group}, + ) + elif action.webhook.body: + data = parse_w_workflow_placeholders( + action.webhook.body, + context["correspondent"], + context["document_type"], + context["owner_username"], + context["added"], + context["filename"], + context["current_filename"], + context["created"], + context["title"], + context["doc_url"], + ) + headers = {} + if action.webhook.headers: + try: + headers = {str(k): str(v) for k, v in action.webhook.headers.items()} + except Exception as e: + logger.error( + f"Error occurred parsing webhook headers: {e}", + extra={"group": logging_group}, + ) + files = None + if action.webhook.include_document: + with original_file.open("rb") as f: + files = { + "file": ( + str(context["filename"]) + if context["filename"] + else original_file.name, + f.read(), + document.mime_type, + ), + } + send_webhook.delay( + url=action.webhook.url, + data=data, + headers=headers, + files=files, + as_json=action.webhook.as_json, + ) + logger.debug( + f"Webhook to {action.webhook.url} queued", + extra={"group": logging_group}, + ) + except Exception as e: + logger.exception( + f"Error occurred sending webhook: {e}", + extra={"group": logging_group}, + ) diff --git a/src/documents/workflows/mutations.py b/src/documents/workflows/mutations.py new file mode 100644 index 000000000..ef85dba0f --- /dev/null +++ b/src/documents/workflows/mutations.py @@ -0,0 +1,357 @@ +import logging + +from django.utils import timezone +from guardian.shortcuts import remove_perm + +from documents.data_models import DocumentMetadataOverrides +from documents.models import CustomFieldInstance +from documents.models import Document +from documents.models import WorkflowAction +from documents.permissions import set_permissions_for_object +from documents.templating.workflows import parse_w_workflow_placeholders + +logger = logging.getLogger("paperless.workflows.mutations") + + +def apply_assignment_to_document( + action: WorkflowAction, + document: Document, + doc_tag_ids: list[int], + logging_group, +): + """ + Apply assignment actions to a Document instance. + + action: WorkflowAction, annotated with 'has_assign_*' boolean fields + """ + if action.has_assign_tags: + tag_ids_to_add: set[int] = set() + for tag in action.assign_tags.all(): + tag_ids_to_add.add(tag.pk) + tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks()) + + doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add) + + if action.assign_correspondent: + document.correspondent = action.assign_correspondent + + if action.assign_document_type: + document.document_type = action.assign_document_type + + if action.assign_storage_path: + document.storage_path = action.assign_storage_path + + if action.assign_owner: + document.owner = action.assign_owner + + if action.assign_title: + try: + document.title = parse_w_workflow_placeholders( + action.assign_title, + document.correspondent.name if document.correspondent else "", + document.document_type.name if document.document_type else "", + document.owner.username if document.owner else "", + timezone.localtime(document.added), + document.original_filename or "", + document.filename or "", + document.created, + ) + except Exception: # pragma: no cover + logger.exception( + f"Error occurred parsing title assignment '{action.assign_title}', falling back to original", + extra={"group": logging_group}, + ) + + if any( + [ + action.has_assign_view_users, + action.has_assign_view_groups, + action.has_assign_change_users, + action.has_assign_change_groups, + ], + ): + permissions = { + "view": { + "users": action.assign_view_users.values_list("id", flat=True), + "groups": action.assign_view_groups.values_list("id", flat=True), + }, + "change": { + "users": action.assign_change_users.values_list("id", flat=True), + "groups": action.assign_change_groups.values_list("id", flat=True), + }, + } + set_permissions_for_object( + permissions=permissions, + object=document, + merge=True, + ) + + if action.has_assign_custom_fields: + for field in action.assign_custom_fields.all(): + value_field_name = CustomFieldInstance.get_value_field_name( + data_type=field.data_type, + ) + args = { + value_field_name: action.assign_custom_fields_values.get( + str(field.pk), + None, + ), + } + # for some reason update_or_create doesn't work here + instance = CustomFieldInstance.objects.filter( + field=field, + document=document, + ).first() + if instance and args[value_field_name] is not None: + setattr(instance, value_field_name, args[value_field_name]) + instance.save() + elif not instance: + CustomFieldInstance.objects.create( + **args, + field=field, + document=document, + ) + + +def apply_assignment_to_overrides( + action: WorkflowAction, + overrides: DocumentMetadataOverrides, +): + """ + Apply assignment actions to DocumentMetadataOverrides. + + action: WorkflowAction, annotated with 'has_assign_*' boolean fields + """ + if action.has_assign_tags: + if overrides.tag_ids is None: + overrides.tag_ids = [] + tag_ids_to_add: set[int] = set() + for tag in action.assign_tags.all(): + tag_ids_to_add.add(tag.pk) + tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks()) + + overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add) + + if action.assign_correspondent: + overrides.correspondent_id = action.assign_correspondent.pk + + if action.assign_document_type: + overrides.document_type_id = action.assign_document_type.pk + + if action.assign_storage_path: + overrides.storage_path_id = action.assign_storage_path.pk + + if action.assign_owner: + overrides.owner_id = action.assign_owner.pk + + if action.assign_title: + overrides.title = action.assign_title + + if any( + [ + action.has_assign_view_users, + action.has_assign_view_groups, + action.has_assign_change_users, + action.has_assign_change_groups, + ], + ): + overrides.view_users = list( + set( + (overrides.view_users or []) + + list(action.assign_view_users.values_list("id", flat=True)), + ), + ) + overrides.view_groups = list( + set( + (overrides.view_groups or []) + + list(action.assign_view_groups.values_list("id", flat=True)), + ), + ) + overrides.change_users = list( + set( + (overrides.change_users or []) + + list(action.assign_change_users.values_list("id", flat=True)), + ), + ) + overrides.change_groups = list( + set( + (overrides.change_groups or []) + + list(action.assign_change_groups.values_list("id", flat=True)), + ), + ) + + if action.has_assign_custom_fields: + if overrides.custom_fields is None: + overrides.custom_fields = {} + overrides.custom_fields.update( + { + field.pk: action.assign_custom_fields_values.get( + str(field.pk), + None, + ) + for field in action.assign_custom_fields.all() + }, + ) + + +def apply_removal_to_document( + action: WorkflowAction, + document: Document, + doc_tag_ids: list[int], +): + """ + Apply removal actions to a Document instance. + + action: WorkflowAction, annotated with 'has_remove_*' boolean fields + """ + + if action.remove_all_tags: + doc_tag_ids.clear() + else: + tag_ids_to_remove: set[int] = set() + for tag in action.remove_tags.all(): + tag_ids_to_remove.add(tag.pk) + tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks()) + + doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove] + + if action.remove_all_correspondents or ( + document.correspondent + and action.remove_correspondents.filter(pk=document.correspondent.pk).exists() + ): + document.correspondent = None + + if action.remove_all_document_types or ( + document.document_type + and action.remove_document_types.filter(pk=document.document_type.pk).exists() + ): + document.document_type = None + + if action.remove_all_storage_paths or ( + document.storage_path + and action.remove_storage_paths.filter(pk=document.storage_path.pk).exists() + ): + document.storage_path = None + + if action.remove_all_owners or ( + document.owner and action.remove_owners.filter(pk=document.owner.pk).exists() + ): + document.owner = None + + if action.remove_all_permissions: + permissions = { + "view": {"users": [], "groups": []}, + "change": {"users": [], "groups": []}, + } + set_permissions_for_object( + permissions=permissions, + object=document, + merge=False, + ) + + if any( + [ + action.has_remove_view_users, + action.has_remove_view_groups, + action.has_remove_change_users, + action.has_remove_change_groups, + ], + ): + for user in action.remove_view_users.all(): + remove_perm("view_document", user, document) + for user in action.remove_change_users.all(): + remove_perm("change_document", user, document) + for group in action.remove_view_groups.all(): + remove_perm("view_document", group, document) + for group in action.remove_change_groups.all(): + remove_perm("change_document", group, document) + + if action.remove_all_custom_fields: + CustomFieldInstance.objects.filter(document=document).hard_delete() + elif action.has_remove_custom_fields: + CustomFieldInstance.objects.filter( + field__in=action.remove_custom_fields.all(), + document=document, + ).hard_delete() + + +def apply_removal_to_overrides( + action: WorkflowAction, + overrides: DocumentMetadataOverrides, +): + """ + Apply removal actions to DocumentMetadataOverrides. + + action: WorkflowAction, annotated with 'has_remove_*' boolean fields + """ + if action.remove_all_tags: + overrides.tag_ids = None + elif overrides.tag_ids: + tag_ids_to_remove: set[int] = set() + for tag in action.remove_tags.all(): + tag_ids_to_remove.add(tag.pk) + tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks()) + + overrides.tag_ids = [t for t in overrides.tag_ids if t not in tag_ids_to_remove] + + if action.remove_all_correspondents or ( + overrides.correspondent_id + and action.remove_correspondents.filter(pk=overrides.correspondent_id).exists() + ): + overrides.correspondent_id = None + + if action.remove_all_document_types or ( + overrides.document_type_id + and action.remove_document_types.filter(pk=overrides.document_type_id).exists() + ): + overrides.document_type_id = None + + if action.remove_all_storage_paths or ( + overrides.storage_path_id + and action.remove_storage_paths.filter(pk=overrides.storage_path_id).exists() + ): + overrides.storage_path_id = None + + if action.remove_all_owners or ( + overrides.owner_id + and action.remove_owners.filter(pk=overrides.owner_id).exists() + ): + overrides.owner_id = None + + if action.remove_all_permissions: + overrides.view_users = None + overrides.view_groups = None + overrides.change_users = None + overrides.change_groups = None + elif any( + [ + action.has_remove_view_users, + action.has_remove_view_groups, + action.has_remove_change_users, + action.has_remove_change_groups, + ], + ): + if overrides.view_users: + for user in action.remove_view_users.filter(pk__in=overrides.view_users): + overrides.view_users.remove(user.pk) + if overrides.change_users: + for user in action.remove_change_users.filter( + pk__in=overrides.change_users, + ): + overrides.change_users.remove(user.pk) + if overrides.view_groups: + for group in action.remove_view_groups.filter(pk__in=overrides.view_groups): + overrides.view_groups.remove(group.pk) + if overrides.change_groups: + for group in action.remove_change_groups.filter( + pk__in=overrides.change_groups, + ): + overrides.change_groups.remove(group.pk) + + if action.remove_all_custom_fields: + overrides.custom_fields = None + elif action.has_remove_custom_fields and overrides.custom_fields: + for field in action.remove_custom_fields.filter( + pk__in=overrides.custom_fields.keys(), + ): + overrides.custom_fields.pop(field.pk, None) diff --git a/src/documents/workflows/utils.py b/src/documents/workflows/utils.py new file mode 100644 index 000000000..553622252 --- /dev/null +++ b/src/documents/workflows/utils.py @@ -0,0 +1,116 @@ +import logging + +from django.db.models import Exists +from django.db.models import OuterRef +from django.db.models import Prefetch + +from documents.models import Workflow +from documents.models import WorkflowAction +from documents.models import WorkflowTrigger + +logger = logging.getLogger("paperless.workflows") + + +def get_workflows_for_trigger( + trigger_type: WorkflowTrigger.WorkflowTriggerType, + workflow_to_run: Workflow | None = None, +): + """ + Return workflows relevant to a trigger. If a specific workflow is given, + wrap it in a list; otherwise fetch enabled workflows for the trigger with + the prefetches used by the runner. + """ + if workflow_to_run is not None: + return [workflow_to_run] + + annotated_actions = ( + WorkflowAction.objects.select_related( + "assign_correspondent", + "assign_document_type", + "assign_storage_path", + "assign_owner", + "email", + "webhook", + ) + .prefetch_related( + "assign_tags", + "assign_view_users", + "assign_view_groups", + "assign_change_users", + "assign_change_groups", + "assign_custom_fields", + "remove_tags", + "remove_correspondents", + "remove_document_types", + "remove_storage_paths", + "remove_custom_fields", + "remove_owners", + ) + .annotate( + has_assign_tags=Exists( + WorkflowAction.assign_tags.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_assign_view_users=Exists( + WorkflowAction.assign_view_users.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_assign_view_groups=Exists( + WorkflowAction.assign_view_groups.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_assign_change_users=Exists( + WorkflowAction.assign_change_users.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_assign_change_groups=Exists( + WorkflowAction.assign_change_groups.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_assign_custom_fields=Exists( + WorkflowAction.assign_custom_fields.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_remove_view_users=Exists( + WorkflowAction.remove_view_users.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_remove_view_groups=Exists( + WorkflowAction.remove_view_groups.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_remove_change_users=Exists( + WorkflowAction.remove_change_users.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_remove_change_groups=Exists( + WorkflowAction.remove_change_groups.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + has_remove_custom_fields=Exists( + WorkflowAction.remove_custom_fields.through.objects.filter( + workflowaction_id=OuterRef("pk"), + ), + ), + ) + ) + + return ( + Workflow.objects.filter(enabled=True, triggers__type=trigger_type) + .prefetch_related( + Prefetch("actions", queryset=annotated_actions), + "triggers", + ) + .order_by("order") + .distinct() + ) diff --git a/src/documents/workflows/webhooks.py b/src/documents/workflows/webhooks.py new file mode 100644 index 000000000..c7bb9f7c2 --- /dev/null +++ b/src/documents/workflows/webhooks.py @@ -0,0 +1,96 @@ +import ipaddress +import logging +import socket +from urllib.parse import urlparse + +import httpx +from celery import shared_task +from django.conf import settings + +logger = logging.getLogger("paperless.workflows.webhooks") + + +def _is_public_ip(ip: str) -> bool: + try: + obj = ipaddress.ip_address(ip) + return not ( + obj.is_private + or obj.is_loopback + or obj.is_link_local + or obj.is_multicast + or obj.is_unspecified + ) + except ValueError: # pragma: no cover + return False + + +def _resolve_first_ip(host: str) -> str | None: + try: + info = socket.getaddrinfo(host, None) + return info[0][4][0] if info else None + except Exception: # pragma: no cover + return None + + +@shared_task( + retry_backoff=True, + autoretry_for=(httpx.HTTPStatusError,), + max_retries=3, + throws=(httpx.HTTPError,), +) +def send_webhook( + url: str, + data: str | dict, + headers: dict, + files: dict, + *, + as_json: bool = False, +): + p = urlparse(url) + if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname: + logger.warning("Webhook blocked: invalid scheme/hostname") + raise ValueError("Invalid URL scheme or hostname.") + + port = p.port or (443 if p.scheme == "https" else 80) + if ( + len(settings.WEBHOOKS_ALLOWED_PORTS) > 0 + and port not in settings.WEBHOOKS_ALLOWED_PORTS + ): + logger.warning("Webhook blocked: port not permitted") + raise ValueError("Destination port not permitted.") + + ip = _resolve_first_ip(p.hostname) + if not ip or ( + not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS + ): + logger.warning("Webhook blocked: destination not allowed") + raise ValueError("Destination host is not allowed.") + + try: + post_args = { + "url": url, + "headers": { + k: v for k, v in (headers or {}).items() if k.lower() != "host" + }, + "files": files or None, + "timeout": 5.0, + "follow_redirects": False, + } + if as_json: + post_args["json"] = data + elif isinstance(data, dict): + post_args["data"] = data + else: + post_args["content"] = data + + httpx.post( + **post_args, + ).raise_for_status() + logger.info( + f"Webhook sent to {url}", + ) + except Exception as e: + logger.error( + f"Failed attempt sending webhook to {url}: {e}", + ) + raise e