From dcd9d6cff3823d7f2d49ba6922b4e4c410253929 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 13:49:28 +0000 Subject: [PATCH 1/5] Initial plan From 476b08a23be62e7abf2ba4f6e3330a598dec10d9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 13:56:47 +0000 Subject: [PATCH 2/5] Add AI permission classes and comprehensive tests Co-authored-by: dawnsystem <42047891+dawnsystem@users.noreply.github.com> --- .../migrations/1073_add_ai_permissions.py | 26 + src/documents/models.py | 6 + src/documents/permissions.py | 82 +++ src/documents/tests/test_ai_permissions.py | 524 ++++++++++++++++++ 4 files changed, 638 insertions(+) create mode 100644 src/documents/migrations/1073_add_ai_permissions.py create mode 100644 src/documents/tests/test_ai_permissions.py diff --git a/src/documents/migrations/1073_add_ai_permissions.py b/src/documents/migrations/1073_add_ai_permissions.py new file mode 100644 index 000000000..0fea83d94 --- /dev/null +++ b/src/documents/migrations/1073_add_ai_permissions.py @@ -0,0 +1,26 @@ +# Generated migration for adding AI-related custom permissions + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("documents", "1072_workflowtrigger_filter_custom_field_query_and_more"), + ] + + operations = [ + migrations.AlterModelOptions( + name="document", + options={ + "ordering": ("-created",), + "permissions": [ + ("can_view_ai_suggestions", "Can view AI suggestions"), + ("can_apply_ai_suggestions", "Can apply AI suggestions"), + ("can_approve_deletions", "Can approve AI-recommended deletions"), + ("can_configure_ai", "Can configure AI settings"), + ], + "verbose_name": "document", + "verbose_name_plural": "documents", + }, + ), + ] diff --git a/src/documents/models.py b/src/documents/models.py index 7b0b84b77..a31ce2e4d 100644 --- a/src/documents/models.py +++ b/src/documents/models.py @@ -317,6 +317,12 @@ class Document(SoftDeleteModel, ModelWithOwner): ordering = ("-created",) verbose_name = _("document") verbose_name_plural = _("documents") + permissions = [ + ("can_view_ai_suggestions", "Can view AI suggestions"), + ("can_apply_ai_suggestions", "Can apply AI suggestions"), + ("can_approve_deletions", "Can approve AI-recommended deletions"), + ("can_configure_ai", "Can configure AI settings"), + ] def __str__(self) -> str: created = self.created.isoformat() diff --git a/src/documents/permissions.py b/src/documents/permissions.py index cf6a9aa35..2ab20b497 100644 --- a/src/documents/permissions.py +++ b/src/documents/permissions.py @@ -219,3 +219,85 @@ class AcknowledgeTasksPermissions(BasePermission): perms = self.perms_map.get(request.method, []) return request.user.has_perms(perms) + + +class CanViewAISuggestionsPermission(BasePermission): + """ + Permission class to check if user can view AI suggestions. + + This permission allows users to view AI scan results and suggestions + for documents, including tags, correspondents, document types, and + other metadata suggestions. + """ + + def has_permission(self, request, view): + if not request.user or not request.user.is_authenticated: + return False + + # Superusers always have permission + if request.user.is_superuser: + return True + + # Check for specific permission + return request.user.has_perm("documents.can_view_ai_suggestions") + + +class CanApplyAISuggestionsPermission(BasePermission): + """ + Permission class to check if user can apply AI suggestions to documents. + + This permission allows users to apply AI-generated suggestions to documents, + such as auto-applying tags, correspondents, document types, etc. + """ + + def has_permission(self, request, view): + if not request.user or not request.user.is_authenticated: + return False + + # Superusers always have permission + if request.user.is_superuser: + return True + + # Check for specific permission + return request.user.has_perm("documents.can_apply_ai_suggestions") + + +class CanApproveDeletionsPermission(BasePermission): + """ + Permission class to check if user can approve AI-recommended deletions. + + This permission is required to approve deletion requests initiated by AI, + ensuring that no documents are deleted without explicit user authorization. + """ + + def has_permission(self, request, view): + if not request.user or not request.user.is_authenticated: + return False + + # Superusers always have permission + if request.user.is_superuser: + return True + + # Check for specific permission + return request.user.has_perm("documents.can_approve_deletions") + + +class CanConfigureAIPermission(BasePermission): + """ + Permission class to check if user can configure AI settings. + + This permission allows users to configure AI scanner settings, including + confidence thresholds, auto-apply behavior, and ML feature toggles. + Typically restricted to administrators. + """ + + def has_permission(self, request, view): + if not request.user or not request.user.is_authenticated: + return False + + # Superusers always have permission + if request.user.is_superuser: + return True + + # Check for specific permission + return request.user.has_perm("documents.can_configure_ai") diff --git a/src/documents/tests/test_ai_permissions.py b/src/documents/tests/test_ai_permissions.py new file mode 100644 index 000000000..f8266b2cd --- /dev/null +++ b/src/documents/tests/test_ai_permissions.py @@ -0,0 +1,524 @@ +""" +Unit tests for AI-related permissions. + +Tests cover: +- CanViewAISuggestionsPermission +- CanApplyAISuggestionsPermission +- CanApproveDeletionsPermission +- CanConfigureAIPermission +- Role-based access control +- Permission assignment and verification +""" + +from django.contrib.auth.models import Group, Permission, User +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from rest_framework.test import APIRequestFactory + +from documents.models import Document +from documents.permissions import ( + CanApplyAISuggestionsPermission, + CanApproveDeletionsPermission, + CanConfigureAIPermission, + CanViewAISuggestionsPermission, +) + + +class MockView: + """Mock view for testing permissions.""" + + pass + + +class TestCanViewAISuggestionsPermission(TestCase): + """Test the CanViewAISuggestionsPermission class.""" + + def setUp(self): + """Set up test users and permissions.""" + self.factory = APIRequestFactory() + self.permission = CanViewAISuggestionsPermission() + self.view = MockView() + + # Create users + self.superuser = User.objects.create_superuser( + username="admin", email="admin@test.com", password="admin123" + ) + self.regular_user = User.objects.create_user( + username="regular", email="regular@test.com", password="regular123" + ) + self.permitted_user = User.objects.create_user( + username="permitted", email="permitted@test.com", password="permitted123" + ) + + # Assign permission to permitted_user + content_type = ContentType.objects.get_for_model(Document) + permission, created = Permission.objects.get_or_create( + codename="can_view_ai_suggestions", + name="Can view AI suggestions", + content_type=content_type, + ) + self.permitted_user.user_permissions.add(permission) + + def test_unauthenticated_user_denied(self): + """Test that unauthenticated users are denied.""" + request = self.factory.get("/api/ai/suggestions/") + request.user = None + + result = self.permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_superuser_allowed(self): + """Test that superusers are always allowed.""" + request = self.factory.get("/api/ai/suggestions/") + request.user = self.superuser + + result = self.permission.has_permission(request, self.view) + + self.assertTrue(result) + + def test_regular_user_without_permission_denied(self): + """Test that regular users without permission are denied.""" + request = self.factory.get("/api/ai/suggestions/") + request.user = self.regular_user + + result = self.permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_user_with_permission_allowed(self): + """Test that users with permission are allowed.""" + request = self.factory.get("/api/ai/suggestions/") + request.user = self.permitted_user + + result = self.permission.has_permission(request, self.view) + + self.assertTrue(result) + + +class TestCanApplyAISuggestionsPermission(TestCase): + """Test the CanApplyAISuggestionsPermission class.""" + + def setUp(self): + """Set up test users and permissions.""" + self.factory = APIRequestFactory() + self.permission = CanApplyAISuggestionsPermission() + self.view = MockView() + + # Create users + self.superuser = User.objects.create_superuser( + username="admin", email="admin@test.com", password="admin123" + ) + self.regular_user = User.objects.create_user( + username="regular", email="regular@test.com", password="regular123" + ) + self.permitted_user = User.objects.create_user( + username="permitted", email="permitted@test.com", password="permitted123" + ) + + # Assign permission to permitted_user + content_type = ContentType.objects.get_for_model(Document) + permission, created = Permission.objects.get_or_create( + codename="can_apply_ai_suggestions", + name="Can apply AI suggestions", + content_type=content_type, + ) + self.permitted_user.user_permissions.add(permission) + + def test_unauthenticated_user_denied(self): + """Test that unauthenticated users are denied.""" + request = self.factory.post("/api/ai/suggestions/apply/") + request.user = None + + result = self.permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_superuser_allowed(self): + """Test that superusers are always allowed.""" + request = self.factory.post("/api/ai/suggestions/apply/") + request.user = self.superuser + + result = self.permission.has_permission(request, self.view) + + self.assertTrue(result) + + def test_regular_user_without_permission_denied(self): + """Test that regular users without permission are denied.""" + request = self.factory.post("/api/ai/suggestions/apply/") + request.user = self.regular_user + + result = self.permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_user_with_permission_allowed(self): + """Test that users with permission are allowed.""" + request = self.factory.post("/api/ai/suggestions/apply/") + request.user = self.permitted_user + + result = self.permission.has_permission(request, self.view) + + self.assertTrue(result) + + +class TestCanApproveDeletionsPermission(TestCase): + """Test the CanApproveDeletionsPermission class.""" + + def setUp(self): + """Set up test users and permissions.""" + self.factory = APIRequestFactory() + self.permission = CanApproveDeletionsPermission() + self.view = MockView() + + # Create users + self.superuser = User.objects.create_superuser( + username="admin", email="admin@test.com", password="admin123" + ) + self.regular_user = User.objects.create_user( + username="regular", email="regular@test.com", password="regular123" + ) + self.permitted_user = User.objects.create_user( + username="permitted", email="permitted@test.com", password="permitted123" + ) + + # Assign permission to permitted_user + content_type = ContentType.objects.get_for_model(Document) + permission, created = Permission.objects.get_or_create( + codename="can_approve_deletions", + name="Can approve AI-recommended deletions", + content_type=content_type, + ) + self.permitted_user.user_permissions.add(permission) + + def test_unauthenticated_user_denied(self): + """Test that unauthenticated users are denied.""" + request = self.factory.post("/api/ai/deletions/approve/") + request.user = None + + result = self.permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_superuser_allowed(self): + """Test that superusers are always allowed.""" + request = self.factory.post("/api/ai/deletions/approve/") + request.user = self.superuser + + result = self.permission.has_permission(request, self.view) + + self.assertTrue(result) + + def test_regular_user_without_permission_denied(self): + """Test that regular users without permission are denied.""" + request = self.factory.post("/api/ai/deletions/approve/") + request.user = self.regular_user + + result = self.permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_user_with_permission_allowed(self): + """Test that users with permission are allowed.""" + request = self.factory.post("/api/ai/deletions/approve/") + request.user = self.permitted_user + + result = self.permission.has_permission(request, self.view) + + self.assertTrue(result) + + +class TestCanConfigureAIPermission(TestCase): + """Test the CanConfigureAIPermission class.""" + + def setUp(self): + """Set up test users and permissions.""" + self.factory = APIRequestFactory() + self.permission = CanConfigureAIPermission() + self.view = MockView() + + # Create users + self.superuser = User.objects.create_superuser( + username="admin", email="admin@test.com", password="admin123" + ) + self.regular_user = User.objects.create_user( + username="regular", email="regular@test.com", password="regular123" + ) + self.permitted_user = User.objects.create_user( + username="permitted", email="permitted@test.com", password="permitted123" + ) + + # Assign permission to permitted_user + content_type = ContentType.objects.get_for_model(Document) + permission, created = Permission.objects.get_or_create( + codename="can_configure_ai", + name="Can configure AI settings", + content_type=content_type, + ) + self.permitted_user.user_permissions.add(permission) + + def test_unauthenticated_user_denied(self): + """Test that unauthenticated users are denied.""" + request = self.factory.post("/api/ai/config/") + request.user = None + + result = self.permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_superuser_allowed(self): + """Test that superusers are always allowed.""" + request = self.factory.post("/api/ai/config/") + request.user = self.superuser + + result = self.permission.has_permission(request, self.view) + + self.assertTrue(result) + + def test_regular_user_without_permission_denied(self): + """Test that regular users without permission are denied.""" + request = self.factory.post("/api/ai/config/") + request.user = self.regular_user + + result = self.permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_user_with_permission_allowed(self): + """Test that users with permission are allowed.""" + request = self.factory.post("/api/ai/config/") + request.user = self.permitted_user + + result = self.permission.has_permission(request, self.view) + + self.assertTrue(result) + + +class TestRoleBasedAccessControl(TestCase): + """Test role-based access control for AI permissions.""" + + def setUp(self): + """Set up test groups and permissions.""" + # Create groups + self.viewer_group = Group.objects.create(name="AI Viewers") + self.editor_group = Group.objects.create(name="AI Editors") + self.admin_group = Group.objects.create(name="AI Administrators") + + # Get permissions + content_type = ContentType.objects.get_for_model(Document) + self.view_permission, _ = Permission.objects.get_or_create( + codename="can_view_ai_suggestions", + name="Can view AI suggestions", + content_type=content_type, + ) + self.apply_permission, _ = Permission.objects.get_or_create( + codename="can_apply_ai_suggestions", + name="Can apply AI suggestions", + content_type=content_type, + ) + self.approve_permission, _ = Permission.objects.get_or_create( + codename="can_approve_deletions", + name="Can approve AI-recommended deletions", + content_type=content_type, + ) + self.config_permission, _ = Permission.objects.get_or_create( + codename="can_configure_ai", + name="Can configure AI settings", + content_type=content_type, + ) + + # Assign permissions to groups + # Viewers can only view + self.viewer_group.permissions.add(self.view_permission) + + # Editors can view and apply + self.editor_group.permissions.add(self.view_permission, self.apply_permission) + + # Admins can do everything + self.admin_group.permissions.add( + self.view_permission, + self.apply_permission, + self.approve_permission, + self.config_permission, + ) + + def test_viewer_role_permissions(self): + """Test that viewer role has appropriate permissions.""" + user = User.objects.create_user( + username="viewer", email="viewer@test.com", password="viewer123" + ) + user.groups.add(self.viewer_group) + + # Refresh user to get updated permissions + user = User.objects.get(pk=user.pk) + + self.assertTrue(user.has_perm("documents.can_view_ai_suggestions")) + self.assertFalse(user.has_perm("documents.can_apply_ai_suggestions")) + self.assertFalse(user.has_perm("documents.can_approve_deletions")) + self.assertFalse(user.has_perm("documents.can_configure_ai")) + + def test_editor_role_permissions(self): + """Test that editor role has appropriate permissions.""" + user = User.objects.create_user( + username="editor", email="editor@test.com", password="editor123" + ) + user.groups.add(self.editor_group) + + # Refresh user to get updated permissions + user = User.objects.get(pk=user.pk) + + self.assertTrue(user.has_perm("documents.can_view_ai_suggestions")) + self.assertTrue(user.has_perm("documents.can_apply_ai_suggestions")) + self.assertFalse(user.has_perm("documents.can_approve_deletions")) + self.assertFalse(user.has_perm("documents.can_configure_ai")) + + def test_admin_role_permissions(self): + """Test that admin role has all permissions.""" + user = User.objects.create_user( + username="ai_admin", email="ai_admin@test.com", password="admin123" + ) + user.groups.add(self.admin_group) + + # Refresh user to get updated permissions + user = User.objects.get(pk=user.pk) + + self.assertTrue(user.has_perm("documents.can_view_ai_suggestions")) + self.assertTrue(user.has_perm("documents.can_apply_ai_suggestions")) + self.assertTrue(user.has_perm("documents.can_approve_deletions")) + self.assertTrue(user.has_perm("documents.can_configure_ai")) + + def test_user_with_multiple_groups(self): + """Test that user permissions accumulate from multiple groups.""" + user = User.objects.create_user( + username="multi_role", email="multi@test.com", password="multi123" + ) + user.groups.add(self.viewer_group, self.editor_group) + + # Refresh user to get updated permissions + user = User.objects.get(pk=user.pk) + + # Should have both viewer and editor permissions + self.assertTrue(user.has_perm("documents.can_view_ai_suggestions")) + self.assertTrue(user.has_perm("documents.can_apply_ai_suggestions")) + self.assertFalse(user.has_perm("documents.can_approve_deletions")) + + def test_direct_permission_assignment_overrides_group(self): + """Test that direct permission assignment works alongside group permissions.""" + user = User.objects.create_user( + username="special", email="special@test.com", password="special123" + ) + user.groups.add(self.viewer_group) + + # Directly assign approval permission + user.user_permissions.add(self.approve_permission) + + # Refresh user to get updated permissions + user = User.objects.get(pk=user.pk) + + # Should have viewer group permissions plus direct permission + self.assertTrue(user.has_perm("documents.can_view_ai_suggestions")) + self.assertFalse(user.has_perm("documents.can_apply_ai_suggestions")) + self.assertTrue(user.has_perm("documents.can_approve_deletions")) + self.assertFalse(user.has_perm("documents.can_configure_ai")) + + +class TestPermissionAssignment(TestCase): + """Test permission assignment and revocation.""" + + def setUp(self): + """Set up test user.""" + self.user = User.objects.create_user( + username="testuser", email="test@test.com", password="test123" + ) + content_type = ContentType.objects.get_for_model(Document) + self.view_permission, _ = Permission.objects.get_or_create( + codename="can_view_ai_suggestions", + name="Can view AI suggestions", + content_type=content_type, + ) + + def test_assign_permission_to_user(self): + """Test assigning permission to user.""" + self.assertFalse(self.user.has_perm("documents.can_view_ai_suggestions")) + + self.user.user_permissions.add(self.view_permission) + self.user = User.objects.get(pk=self.user.pk) + + self.assertTrue(self.user.has_perm("documents.can_view_ai_suggestions")) + + def test_revoke_permission_from_user(self): + """Test revoking permission from user.""" + self.user.user_permissions.add(self.view_permission) + self.user = User.objects.get(pk=self.user.pk) + self.assertTrue(self.user.has_perm("documents.can_view_ai_suggestions")) + + self.user.user_permissions.remove(self.view_permission) + self.user = User.objects.get(pk=self.user.pk) + + self.assertFalse(self.user.has_perm("documents.can_view_ai_suggestions")) + + def test_permission_persistence(self): + """Test that permissions persist across user retrieval.""" + self.user.user_permissions.add(self.view_permission) + + # Get user from database + retrieved_user = User.objects.get(username="testuser") + + self.assertTrue(retrieved_user.has_perm("documents.can_view_ai_suggestions")) + + +class TestPermissionEdgeCases(TestCase): + """Test edge cases and error conditions for permissions.""" + + def setUp(self): + """Set up test data.""" + self.factory = APIRequestFactory() + self.view = MockView() + + def test_anonymous_user_request(self): + """Test handling of anonymous user.""" + from django.contrib.auth.models import AnonymousUser + + permission = CanViewAISuggestionsPermission() + request = self.factory.get("/api/ai/suggestions/") + request.user = AnonymousUser() + + result = permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_missing_user_attribute(self): + """Test handling of request without user attribute.""" + permission = CanViewAISuggestionsPermission() + request = self.factory.get("/api/ai/suggestions/") + # Don't set request.user + + result = permission.has_permission(request, self.view) + + self.assertFalse(result) + + def test_inactive_user_with_permission(self): + """Test that inactive users are denied even with permission.""" + user = User.objects.create_user( + username="inactive", email="inactive@test.com", password="inactive123" + ) + user.is_active = False + user.save() + + # Add permission + content_type = ContentType.objects.get_for_model(Document) + permission, _ = Permission.objects.get_or_create( + codename="can_view_ai_suggestions", + name="Can view AI suggestions", + content_type=content_type, + ) + user.user_permissions.add(permission) + + permission_check = CanViewAISuggestionsPermission() + request = self.factory.get("/api/ai/suggestions/") + request.user = user + + # Inactive users should not pass authentication check + result = permission_check.has_permission(request, self.view) + + self.assertFalse(result) From 0eb883287c8ec0956009e9ce024632b5d26abdbd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:00:25 +0000 Subject: [PATCH 3/5] Add AI API endpoints with permission-protected views Co-authored-by: dawnsystem <42047891+dawnsystem@users.noreply.github.com> --- src/documents/serialisers.py | 122 +++++++++++++ src/documents/views.py | 337 +++++++++++++++++++++++++++++++++++ src/paperless/urls.py | 31 ++++ 3 files changed, 490 insertions(+) diff --git a/src/documents/serialisers.py b/src/documents/serialisers.py index f04bb70da..dae87293e 100644 --- a/src/documents/serialisers.py +++ b/src/documents/serialisers.py @@ -2696,3 +2696,125 @@ class StoragePathTestSerializer(SerializerWithPerms): label="Document", write_only=True, ) + + +class AISuggestionsRequestSerializer(serializers.Serializer): + """Serializer for requesting AI suggestions for a document.""" + + document_id = serializers.IntegerField( + required=True, + label="Document ID", + help_text="ID of the document to analyze", + ) + + +class AISuggestionSerializer(serializers.Serializer): + """Serializer for a single AI suggestion.""" + + id = serializers.IntegerField() + name = serializers.CharField() + confidence = serializers.FloatField() + + +class AISuggestionsResponseSerializer(serializers.Serializer): + """Serializer for AI suggestions response.""" + + document_id = serializers.IntegerField() + tags = AISuggestionSerializer(many=True, required=False) + correspondent = AISuggestionSerializer(required=False, allow_null=True) + document_type = AISuggestionSerializer(required=False, allow_null=True) + storage_path = AISuggestionSerializer(required=False, allow_null=True) + title_suggestion = serializers.CharField(required=False, allow_null=True) + custom_fields = serializers.DictField(required=False) + + +class ApplyAISuggestionsSerializer(serializers.Serializer): + """Serializer for applying AI suggestions to a document.""" + + document_id = serializers.IntegerField( + required=True, + label="Document ID", + help_text="ID of the document to apply suggestions to", + ) + apply_tags = serializers.BooleanField( + default=False, + label="Apply Tags", + help_text="Whether to apply tag suggestions", + ) + apply_correspondent = serializers.BooleanField( + default=False, + label="Apply Correspondent", + help_text="Whether to apply correspondent suggestion", + ) + apply_document_type = serializers.BooleanField( + default=False, + label="Apply Document Type", + help_text="Whether to apply document type suggestion", + ) + apply_storage_path = serializers.BooleanField( + default=False, + label="Apply Storage Path", + help_text="Whether to apply storage path suggestion", + ) + apply_title = serializers.BooleanField( + default=False, + label="Apply Title", + help_text="Whether to apply title suggestion", + ) + selected_tags = serializers.ListField( + child=serializers.IntegerField(), + required=False, + label="Selected Tags", + help_text="Specific tag IDs to apply (optional)", + ) + + +class AIConfigurationSerializer(serializers.Serializer): + """Serializer for AI configuration settings.""" + + auto_apply_threshold = serializers.FloatField( + required=False, + min_value=0.0, + max_value=1.0, + label="Auto Apply Threshold", + help_text="Confidence threshold for automatic application (0.0-1.0)", + ) + suggest_threshold = serializers.FloatField( + required=False, + min_value=0.0, + max_value=1.0, + label="Suggest Threshold", + help_text="Confidence threshold for suggestions (0.0-1.0)", + ) + ml_enabled = serializers.BooleanField( + required=False, + label="ML Features Enabled", + help_text="Enable/disable ML features", + ) + advanced_ocr_enabled = serializers.BooleanField( + required=False, + label="Advanced OCR Enabled", + help_text="Enable/disable advanced OCR features", + ) + + +class DeletionApprovalSerializer(serializers.Serializer): + """Serializer for approving/rejecting deletion requests.""" + + request_id = serializers.IntegerField( + required=True, + label="Request ID", + help_text="ID of the deletion request", + ) + action = serializers.ChoiceField( + choices=["approve", "reject"], + required=True, + label="Action", + help_text="Action to take on the deletion request", + ) + reason = serializers.CharField( + required=False, + allow_blank=True, + label="Reason", + help_text="Reason for approval/rejection (optional)", + ) diff --git a/src/documents/views.py b/src/documents/views.py index 822647fdb..7b00909ad 100644 --- a/src/documents/views.py +++ b/src/documents/views.py @@ -3150,3 +3150,340 @@ def serve_logo(request, filename=None): filename=app_logo.name, as_attachment=True, ) + + +class AISuggestionsView(GenericAPIView): + """ + API view to get AI suggestions for a document. + + Requires: can_view_ai_suggestions permission + """ + + permission_classes = [IsAuthenticated, CanViewAISuggestionsPermission] + serializer_class = AISuggestionsResponseSerializer + + def post(self, request): + """Get AI suggestions for a document.""" + from documents.ai_scanner import get_ai_scanner + from documents.models import Document, Tag, Correspondent, DocumentType, StoragePath + from documents.serialisers import AISuggestionsRequestSerializer + + # Validate request + request_serializer = AISuggestionsRequestSerializer(data=request.data) + request_serializer.is_valid(raise_exception=True) + + document_id = request_serializer.validated_data['document_id'] + + try: + document = Document.objects.get(pk=document_id) + except Document.DoesNotExist: + return Response( + {"error": "Document not found"}, + status=status.HTTP_404_NOT_FOUND + ) + + # Check if user has permission to view this document + if not has_perms_owner_aware(request.user, 'documents.view_document', document): + return Response( + {"error": "Permission denied"}, + status=status.HTTP_403_FORBIDDEN + ) + + # Get AI scanner and scan document + scanner = get_ai_scanner() + scan_result = scanner.scan_document(document, document.content or "") + + # Build response + response_data = { + "document_id": document.id, + "tags": [], + "correspondent": None, + "document_type": None, + "storage_path": None, + "title_suggestion": scan_result.title_suggestion, + "custom_fields": {} + } + + # Format tag suggestions + for tag_id, confidence in scan_result.tags: + try: + tag = Tag.objects.get(pk=tag_id) + response_data["tags"].append({ + "id": tag.id, + "name": tag.name, + "confidence": confidence + }) + except Tag.DoesNotExist: + pass + + # Format correspondent suggestion + if scan_result.correspondent: + corr_id, confidence = scan_result.correspondent + try: + correspondent = Correspondent.objects.get(pk=corr_id) + response_data["correspondent"] = { + "id": correspondent.id, + "name": correspondent.name, + "confidence": confidence + } + except Correspondent.DoesNotExist: + pass + + # Format document type suggestion + if scan_result.document_type: + type_id, confidence = scan_result.document_type + try: + doc_type = DocumentType.objects.get(pk=type_id) + response_data["document_type"] = { + "id": doc_type.id, + "name": doc_type.name, + "confidence": confidence + } + except DocumentType.DoesNotExist: + pass + + # Format storage path suggestion + if scan_result.storage_path: + path_id, confidence = scan_result.storage_path + try: + storage_path = StoragePath.objects.get(pk=path_id) + response_data["storage_path"] = { + "id": storage_path.id, + "name": storage_path.name, + "confidence": confidence + } + except StoragePath.DoesNotExist: + pass + + # Format custom fields + for field_id, (value, confidence) in scan_result.custom_fields.items(): + response_data["custom_fields"][str(field_id)] = { + "value": value, + "confidence": confidence + } + + return Response(response_data) + + +class ApplyAISuggestionsView(GenericAPIView): + """ + API view to apply AI suggestions to a document. + + Requires: can_apply_ai_suggestions permission + """ + + permission_classes = [IsAuthenticated, CanApplyAISuggestionsPermission] + + def post(self, request): + """Apply AI suggestions to a document.""" + from documents.ai_scanner import get_ai_scanner + from documents.models import Document, Tag, Correspondent, DocumentType, StoragePath + from documents.serialisers import ApplyAISuggestionsSerializer + + # Validate request + serializer = ApplyAISuggestionsSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + document_id = serializer.validated_data['document_id'] + + try: + document = Document.objects.get(pk=document_id) + except Document.DoesNotExist: + return Response( + {"error": "Document not found"}, + status=status.HTTP_404_NOT_FOUND + ) + + # Check if user has permission to change this document + if not has_perms_owner_aware(request.user, 'documents.change_document', document): + return Response( + {"error": "Permission denied"}, + status=status.HTTP_403_FORBIDDEN + ) + + # Get AI scanner and scan document + scanner = get_ai_scanner() + scan_result = scanner.scan_document(document, document.content or "") + + # Apply suggestions based on user selections + applied = [] + + if serializer.validated_data.get('apply_tags'): + selected_tags = serializer.validated_data.get('selected_tags', []) + if selected_tags: + # Apply only selected tags + tags_to_apply = [tag_id for tag_id, _ in scan_result.tags if tag_id in selected_tags] + else: + # Apply all high-confidence tags + tags_to_apply = [tag_id for tag_id, conf in scan_result.tags if conf >= scanner.auto_apply_threshold] + + for tag_id in tags_to_apply: + try: + tag = Tag.objects.get(pk=tag_id) + document.add_nested_tags([tag]) + applied.append(f"tag: {tag.name}") + except Tag.DoesNotExist: + pass + + if serializer.validated_data.get('apply_correspondent') and scan_result.correspondent: + corr_id, confidence = scan_result.correspondent + try: + correspondent = Correspondent.objects.get(pk=corr_id) + document.correspondent = correspondent + applied.append(f"correspondent: {correspondent.name}") + except Correspondent.DoesNotExist: + pass + + if serializer.validated_data.get('apply_document_type') and scan_result.document_type: + type_id, confidence = scan_result.document_type + try: + doc_type = DocumentType.objects.get(pk=type_id) + document.document_type = doc_type + applied.append(f"document_type: {doc_type.name}") + except DocumentType.DoesNotExist: + pass + + if serializer.validated_data.get('apply_storage_path') and scan_result.storage_path: + path_id, confidence = scan_result.storage_path + try: + storage_path = StoragePath.objects.get(pk=path_id) + document.storage_path = storage_path + applied.append(f"storage_path: {storage_path.name}") + except StoragePath.DoesNotExist: + pass + + if serializer.validated_data.get('apply_title') and scan_result.title_suggestion: + document.title = scan_result.title_suggestion + applied.append(f"title: {scan_result.title_suggestion}") + + # Save document + document.save() + + return Response({ + "status": "success", + "document_id": document.id, + "applied": applied + }) + + +class AIConfigurationView(GenericAPIView): + """ + API view to get/update AI configuration. + + Requires: can_configure_ai permission + """ + + permission_classes = [IsAuthenticated, CanConfigureAIPermission] + + def get(self, request): + """Get current AI configuration.""" + from documents.ai_scanner import get_ai_scanner + from documents.serialisers import AIConfigurationSerializer + + scanner = get_ai_scanner() + + config_data = { + "auto_apply_threshold": scanner.auto_apply_threshold, + "suggest_threshold": scanner.suggest_threshold, + "ml_enabled": scanner.ml_enabled, + "advanced_ocr_enabled": scanner.advanced_ocr_enabled, + } + + serializer = AIConfigurationSerializer(config_data) + return Response(serializer.data) + + def post(self, request): + """Update AI configuration.""" + from documents.ai_scanner import get_ai_scanner, AIDocumentScanner, _scanner_instance + from documents.serialisers import AIConfigurationSerializer + + serializer = AIConfigurationSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + # Create new scanner with updated configuration + config = {} + if 'auto_apply_threshold' in serializer.validated_data: + config['auto_apply_threshold'] = serializer.validated_data['auto_apply_threshold'] + if 'suggest_threshold' in serializer.validated_data: + config['suggest_threshold'] = serializer.validated_data['suggest_threshold'] + if 'ml_enabled' in serializer.validated_data: + config['enable_ml_features'] = serializer.validated_data['ml_enabled'] + if 'advanced_ocr_enabled' in serializer.validated_data: + config['enable_advanced_ocr'] = serializer.validated_data['advanced_ocr_enabled'] + + # Update global scanner instance + global _scanner_instance + _scanner_instance = AIDocumentScanner(**config) + + return Response({ + "status": "success", + "message": "AI configuration updated" + }) + + +class DeletionApprovalView(GenericAPIView): + """ + API view to approve/reject deletion requests. + + Requires: can_approve_deletions permission + """ + + permission_classes = [IsAuthenticated, CanApproveDeletionsPermission] + + def post(self, request): + """Approve or reject a deletion request.""" + from documents.models import DeletionRequest + from documents.serialisers import DeletionApprovalSerializer + + serializer = DeletionApprovalSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + request_id = serializer.validated_data['request_id'] + action = serializer.validated_data['action'] + reason = serializer.validated_data.get('reason', '') + + try: + deletion_request = DeletionRequest.objects.get(pk=request_id) + except DeletionRequest.DoesNotExist: + return Response( + {"error": "Deletion request not found"}, + status=status.HTTP_404_NOT_FOUND + ) + + # Check if user has permission + if deletion_request.user != request.user and not request.user.is_superuser: + return Response( + {"error": "Permission denied"}, + status=status.HTTP_403_FORBIDDEN + ) + + if action == "approve": + deletion_request.status = DeletionRequest.STATUS_APPROVED + deletion_request.save() + + # Perform the actual deletion + # This would integrate with the AI deletion manager + return Response({ + "status": "success", + "message": "Deletion request approved", + "request_id": request_id + }) + + elif action == "reject": + deletion_request.status = DeletionRequest.STATUS_REJECTED + deletion_request.save() + + return Response({ + "status": "success", + "message": "Deletion request rejected", + "request_id": request_id + }) + + +# Import the new permission classes +from documents.permissions import ( + CanViewAISuggestionsPermission, + CanApplyAISuggestionsPermission, + CanApproveDeletionsPermission, + CanConfigureAIPermission, +) diff --git a/src/paperless/urls.py b/src/paperless/urls.py index e24d1a459..90a5a5dd4 100644 --- a/src/paperless/urls.py +++ b/src/paperless/urls.py @@ -15,11 +15,15 @@ from drf_spectacular.views import SpectacularAPIView from drf_spectacular.views import SpectacularSwaggerView from rest_framework.routers import DefaultRouter +from documents.views import AIConfigurationView +from documents.views import AISuggestionsView +from documents.views import ApplyAISuggestionsView from documents.views import BulkDownloadView from documents.views import BulkEditObjectsView from documents.views import BulkEditView from documents.views import CorrespondentViewSet from documents.views import CustomFieldViewSet +from documents.views import DeletionApprovalView from documents.views import DocumentTypeViewSet from documents.views import GlobalSearchView from documents.views import IndexView @@ -200,6 +204,33 @@ urlpatterns = [ TrashView.as_view(), name="trash", ), + re_path( + "^ai/", + include( + [ + re_path( + "^suggestions/$", + AISuggestionsView.as_view(), + name="ai_suggestions", + ), + re_path( + "^suggestions/apply/$", + ApplyAISuggestionsView.as_view(), + name="ai_apply_suggestions", + ), + re_path( + "^config/$", + AIConfigurationView.as_view(), + name="ai_config", + ), + re_path( + "^deletions/approve/$", + DeletionApprovalView.as_view(), + name="ai_deletion_approval", + ), + ], + ), + ), re_path( r"^oauth/callback/", OauthCallbackView.as_view(), From 426e7b8e162a26706830901d28479bde16bff430 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:01:55 +0000 Subject: [PATCH 4/5] Add comprehensive integration tests for AI API endpoints Co-authored-by: dawnsystem <42047891+dawnsystem@users.noreply.github.com> --- src/documents/tests/test_api_ai_endpoints.py | 574 +++++++++++++++++++ 1 file changed, 574 insertions(+) create mode 100644 src/documents/tests/test_api_ai_endpoints.py diff --git a/src/documents/tests/test_api_ai_endpoints.py b/src/documents/tests/test_api_ai_endpoints.py new file mode 100644 index 000000000..525172f4d --- /dev/null +++ b/src/documents/tests/test_api_ai_endpoints.py @@ -0,0 +1,574 @@ +""" +Integration tests for AI API endpoints. + +Tests cover: +- AI suggestions endpoint (POST /api/ai/suggestions/) +- Apply AI suggestions endpoint (POST /api/ai/suggestions/apply/) +- AI configuration endpoint (GET/POST /api/ai/config/) +- Deletion approval endpoint (POST /api/ai/deletions/approve/) +- Permission checks for all endpoints +- Request/response validation +""" + +from unittest import mock + +from django.contrib.auth.models import Permission, User +from django.contrib.contenttypes.models import ContentType +from rest_framework import status +from rest_framework.test import APITestCase + +from documents.models import ( + Correspondent, + DeletionRequest, + Document, + DocumentType, + StoragePath, + Tag, +) +from documents.tests.utils import DirectoriesMixin + + +class TestAISuggestionsEndpoint(DirectoriesMixin, APITestCase): + """Test the AI suggestions endpoint.""" + + def setUp(self): + """Set up test data.""" + super().setUp() + + # Create users + self.superuser = User.objects.create_superuser( + username="admin", email="admin@test.com", password="admin123" + ) + self.user_with_permission = User.objects.create_user( + username="permitted", email="permitted@test.com", password="permitted123" + ) + self.user_without_permission = User.objects.create_user( + username="regular", email="regular@test.com", password="regular123" + ) + + # Assign view permission + content_type = ContentType.objects.get_for_model(Document) + view_permission, _ = Permission.objects.get_or_create( + codename="can_view_ai_suggestions", + name="Can view AI suggestions", + content_type=content_type, + ) + self.user_with_permission.user_permissions.add(view_permission) + + # Create test document + self.document = Document.objects.create( + title="Test Document", + content="This is a test invoice from ACME Corporation" + ) + + # Create test metadata objects + self.tag = Tag.objects.create(name="Invoice") + self.correspondent = Correspondent.objects.create(name="ACME Corp") + self.doc_type = DocumentType.objects.create(name="Invoice") + + def test_unauthorized_access_denied(self): + """Test that unauthenticated users are denied.""" + response = self.client.post( + "/api/ai/suggestions/", + {"document_id": self.document.id}, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_user_without_permission_denied(self): + """Test that users without permission are denied.""" + self.client.force_authenticate(user=self.user_without_permission) + + response = self.client.post( + "/api/ai/suggestions/", + {"document_id": self.document.id}, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_superuser_allowed(self): + """Test that superusers can access the endpoint.""" + self.client.force_authenticate(user=self.superuser) + + with mock.patch('documents.views.get_ai_scanner') as mock_scanner: + # Mock the scanner response + mock_scan_result = mock.MagicMock() + mock_scan_result.tags = [(self.tag.id, 0.85)] + mock_scan_result.correspondent = (self.correspondent.id, 0.90) + mock_scan_result.document_type = (self.doc_type.id, 0.80) + mock_scan_result.storage_path = None + mock_scan_result.title_suggestion = "Invoice - ACME Corp" + mock_scan_result.custom_fields = {} + + mock_scanner_instance = mock.MagicMock() + mock_scanner_instance.scan_document.return_value = mock_scan_result + mock_scanner.return_value = mock_scanner_instance + + response = self.client.post( + "/api/ai/suggestions/", + {"document_id": self.document.id}, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("document_id", response.data) + self.assertEqual(response.data["document_id"], self.document.id) + + def test_user_with_permission_allowed(self): + """Test that users with permission can access the endpoint.""" + self.client.force_authenticate(user=self.user_with_permission) + + with mock.patch('documents.views.get_ai_scanner') as mock_scanner: + # Mock the scanner response + mock_scan_result = mock.MagicMock() + mock_scan_result.tags = [] + mock_scan_result.correspondent = None + mock_scan_result.document_type = None + mock_scan_result.storage_path = None + mock_scan_result.title_suggestion = None + mock_scan_result.custom_fields = {} + + mock_scanner_instance = mock.MagicMock() + mock_scanner_instance.scan_document.return_value = mock_scan_result + mock_scanner.return_value = mock_scanner_instance + + response = self.client.post( + "/api/ai/suggestions/", + {"document_id": self.document.id}, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_invalid_document_id(self): + """Test handling of invalid document ID.""" + self.client.force_authenticate(user=self.superuser) + + response = self.client.post( + "/api/ai/suggestions/", + {"document_id": 99999}, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_missing_document_id(self): + """Test handling of missing document ID.""" + self.client.force_authenticate(user=self.superuser) + + response = self.client.post( + "/api/ai/suggestions/", + {}, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + +class TestApplyAISuggestionsEndpoint(DirectoriesMixin, APITestCase): + """Test the apply AI suggestions endpoint.""" + + def setUp(self): + """Set up test data.""" + super().setUp() + + # Create users + self.superuser = User.objects.create_superuser( + username="admin", email="admin@test.com", password="admin123" + ) + self.user_with_permission = User.objects.create_user( + username="permitted", email="permitted@test.com", password="permitted123" + ) + + # Assign apply permission + content_type = ContentType.objects.get_for_model(Document) + apply_permission, _ = Permission.objects.get_or_create( + codename="can_apply_ai_suggestions", + name="Can apply AI suggestions", + content_type=content_type, + ) + self.user_with_permission.user_permissions.add(apply_permission) + + # Create test document + self.document = Document.objects.create( + title="Test Document", + content="Test content" + ) + + # Create test metadata + self.tag = Tag.objects.create(name="Test Tag") + self.correspondent = Correspondent.objects.create(name="Test Corp") + + def test_unauthorized_access_denied(self): + """Test that unauthenticated users are denied.""" + response = self.client.post( + "/api/ai/suggestions/apply/", + {"document_id": self.document.id}, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_apply_tags_success(self): + """Test successfully applying tag suggestions.""" + self.client.force_authenticate(user=self.superuser) + + with mock.patch('documents.views.get_ai_scanner') as mock_scanner: + # Mock the scanner response + mock_scan_result = mock.MagicMock() + mock_scan_result.tags = [(self.tag.id, 0.85)] + mock_scan_result.correspondent = None + mock_scan_result.document_type = None + mock_scan_result.storage_path = None + mock_scan_result.title_suggestion = None + mock_scan_result.custom_fields = {} + + mock_scanner_instance = mock.MagicMock() + mock_scanner_instance.scan_document.return_value = mock_scan_result + mock_scanner_instance.auto_apply_threshold = 0.80 + mock_scanner.return_value = mock_scanner_instance + + response = self.client.post( + "/api/ai/suggestions/apply/", + { + "document_id": self.document.id, + "apply_tags": True + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["status"], "success") + + def test_apply_correspondent_success(self): + """Test successfully applying correspondent suggestion.""" + self.client.force_authenticate(user=self.superuser) + + with mock.patch('documents.views.get_ai_scanner') as mock_scanner: + # Mock the scanner response + mock_scan_result = mock.MagicMock() + mock_scan_result.tags = [] + mock_scan_result.correspondent = (self.correspondent.id, 0.90) + mock_scan_result.document_type = None + mock_scan_result.storage_path = None + mock_scan_result.title_suggestion = None + mock_scan_result.custom_fields = {} + + mock_scanner_instance = mock.MagicMock() + mock_scanner_instance.scan_document.return_value = mock_scan_result + mock_scanner_instance.auto_apply_threshold = 0.80 + mock_scanner.return_value = mock_scanner_instance + + response = self.client.post( + "/api/ai/suggestions/apply/", + { + "document_id": self.document.id, + "apply_correspondent": True + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Verify correspondent was applied + self.document.refresh_from_db() + self.assertEqual(self.document.correspondent, self.correspondent) + + +class TestAIConfigurationEndpoint(DirectoriesMixin, APITestCase): + """Test the AI configuration endpoint.""" + + def setUp(self): + """Set up test data.""" + super().setUp() + + # Create users + self.superuser = User.objects.create_superuser( + username="admin", email="admin@test.com", password="admin123" + ) + self.user_without_permission = User.objects.create_user( + username="regular", email="regular@test.com", password="regular123" + ) + + def test_unauthorized_access_denied(self): + """Test that unauthenticated users are denied.""" + response = self.client.get("/api/ai/config/") + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_user_without_permission_denied(self): + """Test that users without permission are denied.""" + self.client.force_authenticate(user=self.user_without_permission) + + response = self.client.get("/api/ai/config/") + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_get_config_success(self): + """Test getting AI configuration.""" + self.client.force_authenticate(user=self.superuser) + + with mock.patch('documents.views.get_ai_scanner') as mock_scanner: + mock_scanner_instance = mock.MagicMock() + mock_scanner_instance.auto_apply_threshold = 0.80 + mock_scanner_instance.suggest_threshold = 0.60 + mock_scanner_instance.ml_enabled = True + mock_scanner_instance.advanced_ocr_enabled = True + mock_scanner.return_value = mock_scanner_instance + + response = self.client.get("/api/ai/config/") + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("auto_apply_threshold", response.data) + self.assertEqual(response.data["auto_apply_threshold"], 0.80) + + def test_update_config_success(self): + """Test updating AI configuration.""" + self.client.force_authenticate(user=self.superuser) + + response = self.client.post( + "/api/ai/config/", + { + "auto_apply_threshold": 0.90, + "suggest_threshold": 0.70 + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["status"], "success") + + def test_update_config_invalid_threshold(self): + """Test updating with invalid threshold value.""" + self.client.force_authenticate(user=self.superuser) + + response = self.client.post( + "/api/ai/config/", + { + "auto_apply_threshold": 1.5 # Invalid: > 1.0 + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + +class TestDeletionApprovalEndpoint(DirectoriesMixin, APITestCase): + """Test the deletion approval endpoint.""" + + def setUp(self): + """Set up test data.""" + super().setUp() + + # Create users + self.superuser = User.objects.create_superuser( + username="admin", email="admin@test.com", password="admin123" + ) + self.user_with_permission = User.objects.create_user( + username="permitted", email="permitted@test.com", password="permitted123" + ) + self.user_without_permission = User.objects.create_user( + username="regular", email="regular@test.com", password="regular123" + ) + + # Assign approval permission + content_type = ContentType.objects.get_for_model(Document) + approval_permission, _ = Permission.objects.get_or_create( + codename="can_approve_deletions", + name="Can approve AI-recommended deletions", + content_type=content_type, + ) + self.user_with_permission.user_permissions.add(approval_permission) + + # Create test deletion request + self.deletion_request = DeletionRequest.objects.create( + user=self.user_with_permission, + requested_by_ai=True, + ai_reason="Document appears to be a duplicate" + ) + + def test_unauthorized_access_denied(self): + """Test that unauthenticated users are denied.""" + response = self.client.post( + "/api/ai/deletions/approve/", + { + "request_id": self.deletion_request.id, + "action": "approve" + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_user_without_permission_denied(self): + """Test that users without permission are denied.""" + self.client.force_authenticate(user=self.user_without_permission) + + response = self.client.post( + "/api/ai/deletions/approve/", + { + "request_id": self.deletion_request.id, + "action": "approve" + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_approve_deletion_success(self): + """Test successfully approving a deletion request.""" + self.client.force_authenticate(user=self.user_with_permission) + + response = self.client.post( + "/api/ai/deletions/approve/", + { + "request_id": self.deletion_request.id, + "action": "approve" + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["status"], "success") + + # Verify status was updated + self.deletion_request.refresh_from_db() + self.assertEqual( + self.deletion_request.status, + DeletionRequest.STATUS_APPROVED + ) + + def test_reject_deletion_success(self): + """Test successfully rejecting a deletion request.""" + self.client.force_authenticate(user=self.user_with_permission) + + response = self.client.post( + "/api/ai/deletions/approve/", + { + "request_id": self.deletion_request.id, + "action": "reject", + "reason": "Document is still needed" + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Verify status was updated + self.deletion_request.refresh_from_db() + self.assertEqual( + self.deletion_request.status, + DeletionRequest.STATUS_REJECTED + ) + + def test_invalid_request_id(self): + """Test handling of invalid deletion request ID.""" + self.client.force_authenticate(user=self.superuser) + + response = self.client.post( + "/api/ai/deletions/approve/", + { + "request_id": 99999, + "action": "approve" + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_superuser_can_approve_any_request(self): + """Test that superusers can approve any deletion request.""" + self.client.force_authenticate(user=self.superuser) + + response = self.client.post( + "/api/ai/deletions/approve/", + { + "request_id": self.deletion_request.id, + "action": "approve" + }, + format="json" + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + +class TestEndpointPermissionIntegration(DirectoriesMixin, APITestCase): + """Test permission integration across all AI endpoints.""" + + def setUp(self): + """Set up test data.""" + super().setUp() + + # Create user with all AI permissions + self.power_user = User.objects.create_user( + username="power_user", email="power@test.com", password="power123" + ) + + content_type = ContentType.objects.get_for_model(Document) + + # Assign all AI permissions + permissions = [ + "can_view_ai_suggestions", + "can_apply_ai_suggestions", + "can_approve_deletions", + "can_configure_ai", + ] + + for codename in permissions: + perm, _ = Permission.objects.get_or_create( + codename=codename, + name=f"Can {codename.replace('_', ' ')}", + content_type=content_type, + ) + self.power_user.user_permissions.add(perm) + + self.document = Document.objects.create( + title="Test Doc", + content="Test" + ) + + def test_power_user_can_access_all_endpoints(self): + """Test that user with all permissions can access all endpoints.""" + self.client.force_authenticate(user=self.power_user) + + # Test suggestions endpoint + with mock.patch('documents.views.get_ai_scanner') as mock_scanner: + mock_scan_result = mock.MagicMock() + mock_scan_result.tags = [] + mock_scan_result.correspondent = None + mock_scan_result.document_type = None + mock_scan_result.storage_path = None + mock_scan_result.title_suggestion = None + mock_scan_result.custom_fields = {} + + mock_scanner_instance = mock.MagicMock() + mock_scanner_instance.scan_document.return_value = mock_scan_result + mock_scanner_instance.auto_apply_threshold = 0.80 + mock_scanner_instance.suggest_threshold = 0.60 + mock_scanner_instance.ml_enabled = True + mock_scanner_instance.advanced_ocr_enabled = True + mock_scanner.return_value = mock_scanner_instance + + response1 = self.client.post( + "/api/ai/suggestions/", + {"document_id": self.document.id}, + format="json" + ) + self.assertEqual(response1.status_code, status.HTTP_200_OK) + + # Test apply endpoint + response2 = self.client.post( + "/api/ai/suggestions/apply/", + { + "document_id": self.document.id, + "apply_tags": False + }, + format="json" + ) + self.assertEqual(response2.status_code, status.HTTP_200_OK) + + # Test config endpoint + response3 = self.client.get("/api/ai/config/") + self.assertEqual(response3.status_code, status.HTTP_200_OK) From 7658a5571bd18a76a7a2772473d85dde99b47b6e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 17:24:43 +0000 Subject: [PATCH 5/5] Address code review feedback: fix imports, docstrings, and add comments - Move all imports to module level (removed from inside methods) - Add missing `status` import from rest_framework - Fix docstring formatting to comply with PEP 257 - Add explanatory comments to empty except clauses - Improve error message for document not found - Add warning comment about thread-safety in config update - Add TODO comments for storing approval/rejection reasons - Remove unused StoragePath import from tests - Remove duplicate permission imports at end of file Co-authored-by: dawnsystem <42047891+dawnsystem@users.noreply.github.com> --- src/documents/tests/test_api_ai_endpoints.py | 1 - src/documents/views.py | 84 +++++++++++--------- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/src/documents/tests/test_api_ai_endpoints.py b/src/documents/tests/test_api_ai_endpoints.py index 525172f4d..a753e0c29 100644 --- a/src/documents/tests/test_api_ai_endpoints.py +++ b/src/documents/tests/test_api_ai_endpoints.py @@ -22,7 +22,6 @@ from documents.models import ( DeletionRequest, Document, DocumentType, - StoragePath, Tag, ) from documents.tests.utils import DirectoriesMixin diff --git a/src/documents/views.py b/src/documents/views.py index 7b00909ad..63bbfa555 100644 --- a/src/documents/views.py +++ b/src/documents/views.py @@ -69,6 +69,7 @@ from packaging import version as packaging_version from redis import Redis from rest_framework import parsers from rest_framework import serializers +from rest_framework import status from rest_framework.decorators import action from rest_framework.exceptions import NotFound from rest_framework.exceptions import ValidationError @@ -127,6 +128,7 @@ from documents.matching import match_storage_paths from documents.matching import match_tags from documents.models import Correspondent from documents.models import CustomField +from documents.models import DeletionRequest from documents.models import Document from documents.models import DocumentType from documents.models import Note @@ -139,9 +141,15 @@ from documents.models import UiSettings from documents.models import Workflow from documents.models import WorkflowAction from documents.models import WorkflowTrigger +from documents.ai_scanner import AIDocumentScanner +from documents.ai_scanner import get_ai_scanner from documents.parsers import get_parser_class_for_mime_type from documents.parsers import parse_date_generator from documents.permissions import AcknowledgeTasksPermissions +from documents.permissions import CanApplyAISuggestionsPermission +from documents.permissions import CanApproveDeletionsPermission +from documents.permissions import CanConfigureAIPermission +from documents.permissions import CanViewAISuggestionsPermission from documents.permissions import PaperlessAdminPermissions from documents.permissions import PaperlessNotePermissions from documents.permissions import PaperlessObjectPermissions @@ -152,11 +160,16 @@ from documents.permissions import has_perms_owner_aware from documents.permissions import set_permissions_for_object from documents.schema import generate_object_with_permissions_schema from documents.serialisers import AcknowledgeTasksViewSerializer +from documents.serialisers import AIConfigurationSerializer +from documents.serialisers import AISuggestionsRequestSerializer +from documents.serialisers import AISuggestionsResponseSerializer +from documents.serialisers import ApplyAISuggestionsSerializer from documents.serialisers import BulkDownloadSerializer from documents.serialisers import BulkEditObjectsSerializer from documents.serialisers import BulkEditSerializer from documents.serialisers import CorrespondentSerializer from documents.serialisers import CustomFieldSerializer +from documents.serialisers import DeletionApprovalSerializer from documents.serialisers import DocumentListSerializer from documents.serialisers import DocumentSerializer from documents.serialisers import DocumentTypeSerializer @@ -3155,7 +3168,7 @@ def serve_logo(request, filename=None): class AISuggestionsView(GenericAPIView): """ API view to get AI suggestions for a document. - + Requires: can_view_ai_suggestions permission """ @@ -3164,10 +3177,6 @@ class AISuggestionsView(GenericAPIView): def post(self, request): """Get AI suggestions for a document.""" - from documents.ai_scanner import get_ai_scanner - from documents.models import Document, Tag, Correspondent, DocumentType, StoragePath - from documents.serialisers import AISuggestionsRequestSerializer - # Validate request request_serializer = AISuggestionsRequestSerializer(data=request.data) request_serializer.is_valid(raise_exception=True) @@ -3178,7 +3187,7 @@ class AISuggestionsView(GenericAPIView): document = Document.objects.get(pk=document_id) except Document.DoesNotExist: return Response( - {"error": "Document not found"}, + {"error": "Document not found or you don't have permission to view it"}, status=status.HTTP_404_NOT_FOUND ) @@ -3214,6 +3223,7 @@ class AISuggestionsView(GenericAPIView): "confidence": confidence }) except Tag.DoesNotExist: + # Tag was suggested by AI but no longer exists; skip it pass # Format correspondent suggestion @@ -3227,6 +3237,7 @@ class AISuggestionsView(GenericAPIView): "confidence": confidence } except Correspondent.DoesNotExist: + # Correspondent was suggested but no longer exists; skip it pass # Format document type suggestion @@ -3240,6 +3251,7 @@ class AISuggestionsView(GenericAPIView): "confidence": confidence } except DocumentType.DoesNotExist: + # Document type was suggested but no longer exists; skip it pass # Format storage path suggestion @@ -3253,6 +3265,7 @@ class AISuggestionsView(GenericAPIView): "confidence": confidence } except StoragePath.DoesNotExist: + # Storage path was suggested but no longer exists; skip it pass # Format custom fields @@ -3268,7 +3281,7 @@ class AISuggestionsView(GenericAPIView): class ApplyAISuggestionsView(GenericAPIView): """ API view to apply AI suggestions to a document. - + Requires: can_apply_ai_suggestions permission """ @@ -3276,10 +3289,6 @@ class ApplyAISuggestionsView(GenericAPIView): def post(self, request): """Apply AI suggestions to a document.""" - from documents.ai_scanner import get_ai_scanner - from documents.models import Document, Tag, Correspondent, DocumentType, StoragePath - from documents.serialisers import ApplyAISuggestionsSerializer - # Validate request serializer = ApplyAISuggestionsSerializer(data=request.data) serializer.is_valid(raise_exception=True) @@ -3323,6 +3332,7 @@ class ApplyAISuggestionsView(GenericAPIView): document.add_nested_tags([tag]) applied.append(f"tag: {tag.name}") except Tag.DoesNotExist: + # Tag not found; skip applying this tag pass if serializer.validated_data.get('apply_correspondent') and scan_result.correspondent: @@ -3332,6 +3342,7 @@ class ApplyAISuggestionsView(GenericAPIView): document.correspondent = correspondent applied.append(f"correspondent: {correspondent.name}") except Correspondent.DoesNotExist: + # Correspondent not found; skip applying pass if serializer.validated_data.get('apply_document_type') and scan_result.document_type: @@ -3341,6 +3352,7 @@ class ApplyAISuggestionsView(GenericAPIView): document.document_type = doc_type applied.append(f"document_type: {doc_type.name}") except DocumentType.DoesNotExist: + # Document type not found; skip applying pass if serializer.validated_data.get('apply_storage_path') and scan_result.storage_path: @@ -3350,6 +3362,7 @@ class ApplyAISuggestionsView(GenericAPIView): document.storage_path = storage_path applied.append(f"storage_path: {storage_path.name}") except StoragePath.DoesNotExist: + # Storage path not found; skip applying pass if serializer.validated_data.get('apply_title') and scan_result.title_suggestion: @@ -3369,7 +3382,7 @@ class ApplyAISuggestionsView(GenericAPIView): class AIConfigurationView(GenericAPIView): """ API view to get/update AI configuration. - + Requires: can_configure_ai permission """ @@ -3377,9 +3390,6 @@ class AIConfigurationView(GenericAPIView): def get(self, request): """Get current AI configuration.""" - from documents.ai_scanner import get_ai_scanner - from documents.serialisers import AIConfigurationSerializer - scanner = get_ai_scanner() config_data = { @@ -3393,10 +3403,13 @@ class AIConfigurationView(GenericAPIView): return Response(serializer.data) def post(self, request): - """Update AI configuration.""" - from documents.ai_scanner import get_ai_scanner, AIDocumentScanner, _scanner_instance - from documents.serialisers import AIConfigurationSerializer + """ + Update AI configuration. + Note: This updates the global scanner instance. Configuration changes + will take effect immediately but may require server restart in production + environments for consistency across workers. + """ serializer = AIConfigurationSerializer(data=request.data) serializer.is_valid(raise_exception=True) @@ -3412,19 +3425,21 @@ class AIConfigurationView(GenericAPIView): config['enable_advanced_ocr'] = serializer.validated_data['advanced_ocr_enabled'] # Update global scanner instance - global _scanner_instance - _scanner_instance = AIDocumentScanner(**config) + # WARNING: Not thread-safe. Consider storing configuration in database + # and reloading on each get_ai_scanner() call for production use + from documents import ai_scanner + ai_scanner._scanner_instance = AIDocumentScanner(**config) return Response({ "status": "success", - "message": "AI configuration updated" + "message": "AI configuration updated. Changes may require server restart for consistency." }) class DeletionApprovalView(GenericAPIView): """ API view to approve/reject deletion requests. - + Requires: can_approve_deletions permission """ @@ -3432,9 +3447,6 @@ class DeletionApprovalView(GenericAPIView): def post(self, request): """Approve or reject a deletion request.""" - from documents.models import DeletionRequest - from documents.serialisers import DeletionApprovalSerializer - serializer = DeletionApprovalSerializer(data=request.data) serializer.is_valid(raise_exception=True) @@ -3450,7 +3462,8 @@ class DeletionApprovalView(GenericAPIView): status=status.HTTP_404_NOT_FOUND ) - # Check if user has permission + # Permission is handled by the permission class; users with the permission + # can approve any deletion request. Additional ownership check for non-superusers. if deletion_request.user != request.user and not request.user.is_superuser: return Response( {"error": "Permission denied"}, @@ -3459,6 +3472,10 @@ class DeletionApprovalView(GenericAPIView): if action == "approve": deletion_request.status = DeletionRequest.STATUS_APPROVED + # TODO: Store approval reason for audit trail + # deletion_request.approval_reason = reason + # deletion_request.reviewed_at = timezone.now() + # deletion_request.reviewed_by = request.user deletion_request.save() # Perform the actual deletion @@ -3468,9 +3485,12 @@ class DeletionApprovalView(GenericAPIView): "message": "Deletion request approved", "request_id": request_id }) - - elif action == "reject": + else: # action == "reject" deletion_request.status = DeletionRequest.STATUS_REJECTED + # TODO: Store rejection reason for audit trail + # deletion_request.rejection_reason = reason + # deletion_request.reviewed_at = timezone.now() + # deletion_request.reviewed_by = request.user deletion_request.save() return Response({ @@ -3479,11 +3499,3 @@ class DeletionApprovalView(GenericAPIView): "request_id": request_id }) - -# Import the new permission classes -from documents.permissions import ( - CanViewAISuggestionsPermission, - CanApplyAISuggestionsPermission, - CanApproveDeletionsPermission, - CanConfigureAIPermission, -)