Merge pull request #42 from dawnsystem/copilot/add-granular-ai-permissions

Add granular permissions for AI features
This commit is contained in:
dawnsystem 2025-11-13 01:38:53 +01:00 committed by GitHub
commit 9c1cd2638d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1713 additions and 0 deletions

View file

@ -0,0 +1,26 @@
# Generated migration for adding AI-related custom permissions
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("documents", "1072_workflowtrigger_filter_custom_field_query_and_more"),
]
operations = [
migrations.AlterModelOptions(
name="document",
options={
"ordering": ("-created",),
"permissions": [
("can_view_ai_suggestions", "Can view AI suggestions"),
("can_apply_ai_suggestions", "Can apply AI suggestions"),
("can_approve_deletions", "Can approve AI-recommended deletions"),
("can_configure_ai", "Can configure AI settings"),
],
"verbose_name": "document",
"verbose_name_plural": "documents",
},
),
]

View file

@ -317,6 +317,12 @@ class Document(SoftDeleteModel, ModelWithOwner):
ordering = ("-created",)
verbose_name = _("document")
verbose_name_plural = _("documents")
permissions = [
("can_view_ai_suggestions", "Can view AI suggestions"),
("can_apply_ai_suggestions", "Can apply AI suggestions"),
("can_approve_deletions", "Can approve AI-recommended deletions"),
("can_configure_ai", "Can configure AI settings"),
]
def __str__(self) -> str:
created = self.created.isoformat()

View file

@ -219,3 +219,85 @@ class AcknowledgeTasksPermissions(BasePermission):
perms = self.perms_map.get(request.method, [])
return request.user.has_perms(perms)
class CanViewAISuggestionsPermission(BasePermission):
"""
Permission class to check if user can view AI suggestions.
This permission allows users to view AI scan results and suggestions
for documents, including tags, correspondents, document types, and
other metadata suggestions.
"""
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
# Superusers always have permission
if request.user.is_superuser:
return True
# Check for specific permission
return request.user.has_perm("documents.can_view_ai_suggestions")
class CanApplyAISuggestionsPermission(BasePermission):
"""
Permission class to check if user can apply AI suggestions to documents.
This permission allows users to apply AI-generated suggestions to documents,
such as auto-applying tags, correspondents, document types, etc.
"""
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
# Superusers always have permission
if request.user.is_superuser:
return True
# Check for specific permission
return request.user.has_perm("documents.can_apply_ai_suggestions")
class CanApproveDeletionsPermission(BasePermission):
"""
Permission class to check if user can approve AI-recommended deletions.
This permission is required to approve deletion requests initiated by AI,
ensuring that no documents are deleted without explicit user authorization.
"""
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
# Superusers always have permission
if request.user.is_superuser:
return True
# Check for specific permission
return request.user.has_perm("documents.can_approve_deletions")
class CanConfigureAIPermission(BasePermission):
"""
Permission class to check if user can configure AI settings.
This permission allows users to configure AI scanner settings, including
confidence thresholds, auto-apply behavior, and ML feature toggles.
Typically restricted to administrators.
"""
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
# Superusers always have permission
if request.user.is_superuser:
return True
# Check for specific permission
return request.user.has_perm("documents.can_configure_ai")

View file

@ -2696,3 +2696,125 @@ class StoragePathTestSerializer(SerializerWithPerms):
label="Document",
write_only=True,
)
class AISuggestionsRequestSerializer(serializers.Serializer):
"""Serializer for requesting AI suggestions for a document."""
document_id = serializers.IntegerField(
required=True,
label="Document ID",
help_text="ID of the document to analyze",
)
class AISuggestionSerializer(serializers.Serializer):
"""Serializer for a single AI suggestion."""
id = serializers.IntegerField()
name = serializers.CharField()
confidence = serializers.FloatField()
class AISuggestionsResponseSerializer(serializers.Serializer):
"""Serializer for AI suggestions response."""
document_id = serializers.IntegerField()
tags = AISuggestionSerializer(many=True, required=False)
correspondent = AISuggestionSerializer(required=False, allow_null=True)
document_type = AISuggestionSerializer(required=False, allow_null=True)
storage_path = AISuggestionSerializer(required=False, allow_null=True)
title_suggestion = serializers.CharField(required=False, allow_null=True)
custom_fields = serializers.DictField(required=False)
class ApplyAISuggestionsSerializer(serializers.Serializer):
"""Serializer for applying AI suggestions to a document."""
document_id = serializers.IntegerField(
required=True,
label="Document ID",
help_text="ID of the document to apply suggestions to",
)
apply_tags = serializers.BooleanField(
default=False,
label="Apply Tags",
help_text="Whether to apply tag suggestions",
)
apply_correspondent = serializers.BooleanField(
default=False,
label="Apply Correspondent",
help_text="Whether to apply correspondent suggestion",
)
apply_document_type = serializers.BooleanField(
default=False,
label="Apply Document Type",
help_text="Whether to apply document type suggestion",
)
apply_storage_path = serializers.BooleanField(
default=False,
label="Apply Storage Path",
help_text="Whether to apply storage path suggestion",
)
apply_title = serializers.BooleanField(
default=False,
label="Apply Title",
help_text="Whether to apply title suggestion",
)
selected_tags = serializers.ListField(
child=serializers.IntegerField(),
required=False,
label="Selected Tags",
help_text="Specific tag IDs to apply (optional)",
)
class AIConfigurationSerializer(serializers.Serializer):
"""Serializer for AI configuration settings."""
auto_apply_threshold = serializers.FloatField(
required=False,
min_value=0.0,
max_value=1.0,
label="Auto Apply Threshold",
help_text="Confidence threshold for automatic application (0.0-1.0)",
)
suggest_threshold = serializers.FloatField(
required=False,
min_value=0.0,
max_value=1.0,
label="Suggest Threshold",
help_text="Confidence threshold for suggestions (0.0-1.0)",
)
ml_enabled = serializers.BooleanField(
required=False,
label="ML Features Enabled",
help_text="Enable/disable ML features",
)
advanced_ocr_enabled = serializers.BooleanField(
required=False,
label="Advanced OCR Enabled",
help_text="Enable/disable advanced OCR features",
)
class DeletionApprovalSerializer(serializers.Serializer):
"""Serializer for approving/rejecting deletion requests."""
request_id = serializers.IntegerField(
required=True,
label="Request ID",
help_text="ID of the deletion request",
)
action = serializers.ChoiceField(
choices=["approve", "reject"],
required=True,
label="Action",
help_text="Action to take on the deletion request",
)
reason = serializers.CharField(
required=False,
allow_blank=True,
label="Reason",
help_text="Reason for approval/rejection (optional)",
)

View file

@ -0,0 +1,524 @@
"""
Unit tests for AI-related permissions.
Tests cover:
- CanViewAISuggestionsPermission
- CanApplyAISuggestionsPermission
- CanApproveDeletionsPermission
- CanConfigureAIPermission
- Role-based access control
- Permission assignment and verification
"""
from django.contrib.auth.models import Group, Permission, User
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase
from rest_framework.test import APIRequestFactory
from documents.models import Document
from documents.permissions import (
CanApplyAISuggestionsPermission,
CanApproveDeletionsPermission,
CanConfigureAIPermission,
CanViewAISuggestionsPermission,
)
class MockView:
"""Mock view for testing permissions."""
pass
class TestCanViewAISuggestionsPermission(TestCase):
"""Test the CanViewAISuggestionsPermission class."""
def setUp(self):
"""Set up test users and permissions."""
self.factory = APIRequestFactory()
self.permission = CanViewAISuggestionsPermission()
self.view = MockView()
# Create users
self.superuser = User.objects.create_superuser(
username="admin", email="admin@test.com", password="admin123"
)
self.regular_user = User.objects.create_user(
username="regular", email="regular@test.com", password="regular123"
)
self.permitted_user = User.objects.create_user(
username="permitted", email="permitted@test.com", password="permitted123"
)
# Assign permission to permitted_user
content_type = ContentType.objects.get_for_model(Document)
permission, created = Permission.objects.get_or_create(
codename="can_view_ai_suggestions",
name="Can view AI suggestions",
content_type=content_type,
)
self.permitted_user.user_permissions.add(permission)
def test_unauthenticated_user_denied(self):
"""Test that unauthenticated users are denied."""
request = self.factory.get("/api/ai/suggestions/")
request.user = None
result = self.permission.has_permission(request, self.view)
self.assertFalse(result)
def test_superuser_allowed(self):
"""Test that superusers are always allowed."""
request = self.factory.get("/api/ai/suggestions/")
request.user = self.superuser
result = self.permission.has_permission(request, self.view)
self.assertTrue(result)
def test_regular_user_without_permission_denied(self):
"""Test that regular users without permission are denied."""
request = self.factory.get("/api/ai/suggestions/")
request.user = self.regular_user
result = self.permission.has_permission(request, self.view)
self.assertFalse(result)
def test_user_with_permission_allowed(self):
"""Test that users with permission are allowed."""
request = self.factory.get("/api/ai/suggestions/")
request.user = self.permitted_user
result = self.permission.has_permission(request, self.view)
self.assertTrue(result)
class TestCanApplyAISuggestionsPermission(TestCase):
"""Test the CanApplyAISuggestionsPermission class."""
def setUp(self):
"""Set up test users and permissions."""
self.factory = APIRequestFactory()
self.permission = CanApplyAISuggestionsPermission()
self.view = MockView()
# Create users
self.superuser = User.objects.create_superuser(
username="admin", email="admin@test.com", password="admin123"
)
self.regular_user = User.objects.create_user(
username="regular", email="regular@test.com", password="regular123"
)
self.permitted_user = User.objects.create_user(
username="permitted", email="permitted@test.com", password="permitted123"
)
# Assign permission to permitted_user
content_type = ContentType.objects.get_for_model(Document)
permission, created = Permission.objects.get_or_create(
codename="can_apply_ai_suggestions",
name="Can apply AI suggestions",
content_type=content_type,
)
self.permitted_user.user_permissions.add(permission)
def test_unauthenticated_user_denied(self):
"""Test that unauthenticated users are denied."""
request = self.factory.post("/api/ai/suggestions/apply/")
request.user = None
result = self.permission.has_permission(request, self.view)
self.assertFalse(result)
def test_superuser_allowed(self):
"""Test that superusers are always allowed."""
request = self.factory.post("/api/ai/suggestions/apply/")
request.user = self.superuser
result = self.permission.has_permission(request, self.view)
self.assertTrue(result)
def test_regular_user_without_permission_denied(self):
"""Test that regular users without permission are denied."""
request = self.factory.post("/api/ai/suggestions/apply/")
request.user = self.regular_user
result = self.permission.has_permission(request, self.view)
self.assertFalse(result)
def test_user_with_permission_allowed(self):
"""Test that users with permission are allowed."""
request = self.factory.post("/api/ai/suggestions/apply/")
request.user = self.permitted_user
result = self.permission.has_permission(request, self.view)
self.assertTrue(result)
class TestCanApproveDeletionsPermission(TestCase):
"""Test the CanApproveDeletionsPermission class."""
def setUp(self):
"""Set up test users and permissions."""
self.factory = APIRequestFactory()
self.permission = CanApproveDeletionsPermission()
self.view = MockView()
# Create users
self.superuser = User.objects.create_superuser(
username="admin", email="admin@test.com", password="admin123"
)
self.regular_user = User.objects.create_user(
username="regular", email="regular@test.com", password="regular123"
)
self.permitted_user = User.objects.create_user(
username="permitted", email="permitted@test.com", password="permitted123"
)
# Assign permission to permitted_user
content_type = ContentType.objects.get_for_model(Document)
permission, created = Permission.objects.get_or_create(
codename="can_approve_deletions",
name="Can approve AI-recommended deletions",
content_type=content_type,
)
self.permitted_user.user_permissions.add(permission)
def test_unauthenticated_user_denied(self):
"""Test that unauthenticated users are denied."""
request = self.factory.post("/api/ai/deletions/approve/")
request.user = None
result = self.permission.has_permission(request, self.view)
self.assertFalse(result)
def test_superuser_allowed(self):
"""Test that superusers are always allowed."""
request = self.factory.post("/api/ai/deletions/approve/")
request.user = self.superuser
result = self.permission.has_permission(request, self.view)
self.assertTrue(result)
def test_regular_user_without_permission_denied(self):
"""Test that regular users without permission are denied."""
request = self.factory.post("/api/ai/deletions/approve/")
request.user = self.regular_user
result = self.permission.has_permission(request, self.view)
self.assertFalse(result)
def test_user_with_permission_allowed(self):
"""Test that users with permission are allowed."""
request = self.factory.post("/api/ai/deletions/approve/")
request.user = self.permitted_user
result = self.permission.has_permission(request, self.view)
self.assertTrue(result)
class TestCanConfigureAIPermission(TestCase):
"""Test the CanConfigureAIPermission class."""
def setUp(self):
"""Set up test users and permissions."""
self.factory = APIRequestFactory()
self.permission = CanConfigureAIPermission()
self.view = MockView()
# Create users
self.superuser = User.objects.create_superuser(
username="admin", email="admin@test.com", password="admin123"
)
self.regular_user = User.objects.create_user(
username="regular", email="regular@test.com", password="regular123"
)
self.permitted_user = User.objects.create_user(
username="permitted", email="permitted@test.com", password="permitted123"
)
# Assign permission to permitted_user
content_type = ContentType.objects.get_for_model(Document)
permission, created = Permission.objects.get_or_create(
codename="can_configure_ai",
name="Can configure AI settings",
content_type=content_type,
)
self.permitted_user.user_permissions.add(permission)
def test_unauthenticated_user_denied(self):
"""Test that unauthenticated users are denied."""
request = self.factory.post("/api/ai/config/")
request.user = None
result = self.permission.has_permission(request, self.view)
self.assertFalse(result)
def test_superuser_allowed(self):
"""Test that superusers are always allowed."""
request = self.factory.post("/api/ai/config/")
request.user = self.superuser
result = self.permission.has_permission(request, self.view)
self.assertTrue(result)
def test_regular_user_without_permission_denied(self):
"""Test that regular users without permission are denied."""
request = self.factory.post("/api/ai/config/")
request.user = self.regular_user
result = self.permission.has_permission(request, self.view)
self.assertFalse(result)
def test_user_with_permission_allowed(self):
"""Test that users with permission are allowed."""
request = self.factory.post("/api/ai/config/")
request.user = self.permitted_user
result = self.permission.has_permission(request, self.view)
self.assertTrue(result)
class TestRoleBasedAccessControl(TestCase):
"""Test role-based access control for AI permissions."""
def setUp(self):
"""Set up test groups and permissions."""
# Create groups
self.viewer_group = Group.objects.create(name="AI Viewers")
self.editor_group = Group.objects.create(name="AI Editors")
self.admin_group = Group.objects.create(name="AI Administrators")
# Get permissions
content_type = ContentType.objects.get_for_model(Document)
self.view_permission, _ = Permission.objects.get_or_create(
codename="can_view_ai_suggestions",
name="Can view AI suggestions",
content_type=content_type,
)
self.apply_permission, _ = Permission.objects.get_or_create(
codename="can_apply_ai_suggestions",
name="Can apply AI suggestions",
content_type=content_type,
)
self.approve_permission, _ = Permission.objects.get_or_create(
codename="can_approve_deletions",
name="Can approve AI-recommended deletions",
content_type=content_type,
)
self.config_permission, _ = Permission.objects.get_or_create(
codename="can_configure_ai",
name="Can configure AI settings",
content_type=content_type,
)
# Assign permissions to groups
# Viewers can only view
self.viewer_group.permissions.add(self.view_permission)
# Editors can view and apply
self.editor_group.permissions.add(self.view_permission, self.apply_permission)
# Admins can do everything
self.admin_group.permissions.add(
self.view_permission,
self.apply_permission,
self.approve_permission,
self.config_permission,
)
def test_viewer_role_permissions(self):
"""Test that viewer role has appropriate permissions."""
user = User.objects.create_user(
username="viewer", email="viewer@test.com", password="viewer123"
)
user.groups.add(self.viewer_group)
# Refresh user to get updated permissions
user = User.objects.get(pk=user.pk)
self.assertTrue(user.has_perm("documents.can_view_ai_suggestions"))
self.assertFalse(user.has_perm("documents.can_apply_ai_suggestions"))
self.assertFalse(user.has_perm("documents.can_approve_deletions"))
self.assertFalse(user.has_perm("documents.can_configure_ai"))
def test_editor_role_permissions(self):
"""Test that editor role has appropriate permissions."""
user = User.objects.create_user(
username="editor", email="editor@test.com", password="editor123"
)
user.groups.add(self.editor_group)
# Refresh user to get updated permissions
user = User.objects.get(pk=user.pk)
self.assertTrue(user.has_perm("documents.can_view_ai_suggestions"))
self.assertTrue(user.has_perm("documents.can_apply_ai_suggestions"))
self.assertFalse(user.has_perm("documents.can_approve_deletions"))
self.assertFalse(user.has_perm("documents.can_configure_ai"))
def test_admin_role_permissions(self):
"""Test that admin role has all permissions."""
user = User.objects.create_user(
username="ai_admin", email="ai_admin@test.com", password="admin123"
)
user.groups.add(self.admin_group)
# Refresh user to get updated permissions
user = User.objects.get(pk=user.pk)
self.assertTrue(user.has_perm("documents.can_view_ai_suggestions"))
self.assertTrue(user.has_perm("documents.can_apply_ai_suggestions"))
self.assertTrue(user.has_perm("documents.can_approve_deletions"))
self.assertTrue(user.has_perm("documents.can_configure_ai"))
def test_user_with_multiple_groups(self):
"""Test that user permissions accumulate from multiple groups."""
user = User.objects.create_user(
username="multi_role", email="multi@test.com", password="multi123"
)
user.groups.add(self.viewer_group, self.editor_group)
# Refresh user to get updated permissions
user = User.objects.get(pk=user.pk)
# Should have both viewer and editor permissions
self.assertTrue(user.has_perm("documents.can_view_ai_suggestions"))
self.assertTrue(user.has_perm("documents.can_apply_ai_suggestions"))
self.assertFalse(user.has_perm("documents.can_approve_deletions"))
def test_direct_permission_assignment_overrides_group(self):
"""Test that direct permission assignment works alongside group permissions."""
user = User.objects.create_user(
username="special", email="special@test.com", password="special123"
)
user.groups.add(self.viewer_group)
# Directly assign approval permission
user.user_permissions.add(self.approve_permission)
# Refresh user to get updated permissions
user = User.objects.get(pk=user.pk)
# Should have viewer group permissions plus direct permission
self.assertTrue(user.has_perm("documents.can_view_ai_suggestions"))
self.assertFalse(user.has_perm("documents.can_apply_ai_suggestions"))
self.assertTrue(user.has_perm("documents.can_approve_deletions"))
self.assertFalse(user.has_perm("documents.can_configure_ai"))
class TestPermissionAssignment(TestCase):
"""Test permission assignment and revocation."""
def setUp(self):
"""Set up test user."""
self.user = User.objects.create_user(
username="testuser", email="test@test.com", password="test123"
)
content_type = ContentType.objects.get_for_model(Document)
self.view_permission, _ = Permission.objects.get_or_create(
codename="can_view_ai_suggestions",
name="Can view AI suggestions",
content_type=content_type,
)
def test_assign_permission_to_user(self):
"""Test assigning permission to user."""
self.assertFalse(self.user.has_perm("documents.can_view_ai_suggestions"))
self.user.user_permissions.add(self.view_permission)
self.user = User.objects.get(pk=self.user.pk)
self.assertTrue(self.user.has_perm("documents.can_view_ai_suggestions"))
def test_revoke_permission_from_user(self):
"""Test revoking permission from user."""
self.user.user_permissions.add(self.view_permission)
self.user = User.objects.get(pk=self.user.pk)
self.assertTrue(self.user.has_perm("documents.can_view_ai_suggestions"))
self.user.user_permissions.remove(self.view_permission)
self.user = User.objects.get(pk=self.user.pk)
self.assertFalse(self.user.has_perm("documents.can_view_ai_suggestions"))
def test_permission_persistence(self):
"""Test that permissions persist across user retrieval."""
self.user.user_permissions.add(self.view_permission)
# Get user from database
retrieved_user = User.objects.get(username="testuser")
self.assertTrue(retrieved_user.has_perm("documents.can_view_ai_suggestions"))
class TestPermissionEdgeCases(TestCase):
"""Test edge cases and error conditions for permissions."""
def setUp(self):
"""Set up test data."""
self.factory = APIRequestFactory()
self.view = MockView()
def test_anonymous_user_request(self):
"""Test handling of anonymous user."""
from django.contrib.auth.models import AnonymousUser
permission = CanViewAISuggestionsPermission()
request = self.factory.get("/api/ai/suggestions/")
request.user = AnonymousUser()
result = permission.has_permission(request, self.view)
self.assertFalse(result)
def test_missing_user_attribute(self):
"""Test handling of request without user attribute."""
permission = CanViewAISuggestionsPermission()
request = self.factory.get("/api/ai/suggestions/")
# Don't set request.user
result = permission.has_permission(request, self.view)
self.assertFalse(result)
def test_inactive_user_with_permission(self):
"""Test that inactive users are denied even with permission."""
user = User.objects.create_user(
username="inactive", email="inactive@test.com", password="inactive123"
)
user.is_active = False
user.save()
# Add permission
content_type = ContentType.objects.get_for_model(Document)
permission, _ = Permission.objects.get_or_create(
codename="can_view_ai_suggestions",
name="Can view AI suggestions",
content_type=content_type,
)
user.user_permissions.add(permission)
permission_check = CanViewAISuggestionsPermission()
request = self.factory.get("/api/ai/suggestions/")
request.user = user
# Inactive users should not pass authentication check
result = permission_check.has_permission(request, self.view)
self.assertFalse(result)

View file

@ -0,0 +1,573 @@
"""
Integration tests for AI API endpoints.
Tests cover:
- AI suggestions endpoint (POST /api/ai/suggestions/)
- Apply AI suggestions endpoint (POST /api/ai/suggestions/apply/)
- AI configuration endpoint (GET/POST /api/ai/config/)
- Deletion approval endpoint (POST /api/ai/deletions/approve/)
- Permission checks for all endpoints
- Request/response validation
"""
from unittest import mock
from django.contrib.auth.models import Permission, User
from django.contrib.contenttypes.models import ContentType
from rest_framework import status
from rest_framework.test import APITestCase
from documents.models import (
Correspondent,
DeletionRequest,
Document,
DocumentType,
Tag,
)
from documents.tests.utils import DirectoriesMixin
class TestAISuggestionsEndpoint(DirectoriesMixin, APITestCase):
"""Test the AI suggestions endpoint."""
def setUp(self):
"""Set up test data."""
super().setUp()
# Create users
self.superuser = User.objects.create_superuser(
username="admin", email="admin@test.com", password="admin123"
)
self.user_with_permission = User.objects.create_user(
username="permitted", email="permitted@test.com", password="permitted123"
)
self.user_without_permission = User.objects.create_user(
username="regular", email="regular@test.com", password="regular123"
)
# Assign view permission
content_type = ContentType.objects.get_for_model(Document)
view_permission, _ = Permission.objects.get_or_create(
codename="can_view_ai_suggestions",
name="Can view AI suggestions",
content_type=content_type,
)
self.user_with_permission.user_permissions.add(view_permission)
# Create test document
self.document = Document.objects.create(
title="Test Document",
content="This is a test invoice from ACME Corporation"
)
# Create test metadata objects
self.tag = Tag.objects.create(name="Invoice")
self.correspondent = Correspondent.objects.create(name="ACME Corp")
self.doc_type = DocumentType.objects.create(name="Invoice")
def test_unauthorized_access_denied(self):
"""Test that unauthenticated users are denied."""
response = self.client.post(
"/api/ai/suggestions/",
{"document_id": self.document.id},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_user_without_permission_denied(self):
"""Test that users without permission are denied."""
self.client.force_authenticate(user=self.user_without_permission)
response = self.client.post(
"/api/ai/suggestions/",
{"document_id": self.document.id},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_superuser_allowed(self):
"""Test that superusers can access the endpoint."""
self.client.force_authenticate(user=self.superuser)
with mock.patch('documents.views.get_ai_scanner') as mock_scanner:
# Mock the scanner response
mock_scan_result = mock.MagicMock()
mock_scan_result.tags = [(self.tag.id, 0.85)]
mock_scan_result.correspondent = (self.correspondent.id, 0.90)
mock_scan_result.document_type = (self.doc_type.id, 0.80)
mock_scan_result.storage_path = None
mock_scan_result.title_suggestion = "Invoice - ACME Corp"
mock_scan_result.custom_fields = {}
mock_scanner_instance = mock.MagicMock()
mock_scanner_instance.scan_document.return_value = mock_scan_result
mock_scanner.return_value = mock_scanner_instance
response = self.client.post(
"/api/ai/suggestions/",
{"document_id": self.document.id},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("document_id", response.data)
self.assertEqual(response.data["document_id"], self.document.id)
def test_user_with_permission_allowed(self):
"""Test that users with permission can access the endpoint."""
self.client.force_authenticate(user=self.user_with_permission)
with mock.patch('documents.views.get_ai_scanner') as mock_scanner:
# Mock the scanner response
mock_scan_result = mock.MagicMock()
mock_scan_result.tags = []
mock_scan_result.correspondent = None
mock_scan_result.document_type = None
mock_scan_result.storage_path = None
mock_scan_result.title_suggestion = None
mock_scan_result.custom_fields = {}
mock_scanner_instance = mock.MagicMock()
mock_scanner_instance.scan_document.return_value = mock_scan_result
mock_scanner.return_value = mock_scanner_instance
response = self.client.post(
"/api/ai/suggestions/",
{"document_id": self.document.id},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_invalid_document_id(self):
"""Test handling of invalid document ID."""
self.client.force_authenticate(user=self.superuser)
response = self.client.post(
"/api/ai/suggestions/",
{"document_id": 99999},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_missing_document_id(self):
"""Test handling of missing document ID."""
self.client.force_authenticate(user=self.superuser)
response = self.client.post(
"/api/ai/suggestions/",
{},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
class TestApplyAISuggestionsEndpoint(DirectoriesMixin, APITestCase):
"""Test the apply AI suggestions endpoint."""
def setUp(self):
"""Set up test data."""
super().setUp()
# Create users
self.superuser = User.objects.create_superuser(
username="admin", email="admin@test.com", password="admin123"
)
self.user_with_permission = User.objects.create_user(
username="permitted", email="permitted@test.com", password="permitted123"
)
# Assign apply permission
content_type = ContentType.objects.get_for_model(Document)
apply_permission, _ = Permission.objects.get_or_create(
codename="can_apply_ai_suggestions",
name="Can apply AI suggestions",
content_type=content_type,
)
self.user_with_permission.user_permissions.add(apply_permission)
# Create test document
self.document = Document.objects.create(
title="Test Document",
content="Test content"
)
# Create test metadata
self.tag = Tag.objects.create(name="Test Tag")
self.correspondent = Correspondent.objects.create(name="Test Corp")
def test_unauthorized_access_denied(self):
"""Test that unauthenticated users are denied."""
response = self.client.post(
"/api/ai/suggestions/apply/",
{"document_id": self.document.id},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_apply_tags_success(self):
"""Test successfully applying tag suggestions."""
self.client.force_authenticate(user=self.superuser)
with mock.patch('documents.views.get_ai_scanner') as mock_scanner:
# Mock the scanner response
mock_scan_result = mock.MagicMock()
mock_scan_result.tags = [(self.tag.id, 0.85)]
mock_scan_result.correspondent = None
mock_scan_result.document_type = None
mock_scan_result.storage_path = None
mock_scan_result.title_suggestion = None
mock_scan_result.custom_fields = {}
mock_scanner_instance = mock.MagicMock()
mock_scanner_instance.scan_document.return_value = mock_scan_result
mock_scanner_instance.auto_apply_threshold = 0.80
mock_scanner.return_value = mock_scanner_instance
response = self.client.post(
"/api/ai/suggestions/apply/",
{
"document_id": self.document.id,
"apply_tags": True
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["status"], "success")
def test_apply_correspondent_success(self):
"""Test successfully applying correspondent suggestion."""
self.client.force_authenticate(user=self.superuser)
with mock.patch('documents.views.get_ai_scanner') as mock_scanner:
# Mock the scanner response
mock_scan_result = mock.MagicMock()
mock_scan_result.tags = []
mock_scan_result.correspondent = (self.correspondent.id, 0.90)
mock_scan_result.document_type = None
mock_scan_result.storage_path = None
mock_scan_result.title_suggestion = None
mock_scan_result.custom_fields = {}
mock_scanner_instance = mock.MagicMock()
mock_scanner_instance.scan_document.return_value = mock_scan_result
mock_scanner_instance.auto_apply_threshold = 0.80
mock_scanner.return_value = mock_scanner_instance
response = self.client.post(
"/api/ai/suggestions/apply/",
{
"document_id": self.document.id,
"apply_correspondent": True
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Verify correspondent was applied
self.document.refresh_from_db()
self.assertEqual(self.document.correspondent, self.correspondent)
class TestAIConfigurationEndpoint(DirectoriesMixin, APITestCase):
"""Test the AI configuration endpoint."""
def setUp(self):
"""Set up test data."""
super().setUp()
# Create users
self.superuser = User.objects.create_superuser(
username="admin", email="admin@test.com", password="admin123"
)
self.user_without_permission = User.objects.create_user(
username="regular", email="regular@test.com", password="regular123"
)
def test_unauthorized_access_denied(self):
"""Test that unauthenticated users are denied."""
response = self.client.get("/api/ai/config/")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_user_without_permission_denied(self):
"""Test that users without permission are denied."""
self.client.force_authenticate(user=self.user_without_permission)
response = self.client.get("/api/ai/config/")
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_get_config_success(self):
"""Test getting AI configuration."""
self.client.force_authenticate(user=self.superuser)
with mock.patch('documents.views.get_ai_scanner') as mock_scanner:
mock_scanner_instance = mock.MagicMock()
mock_scanner_instance.auto_apply_threshold = 0.80
mock_scanner_instance.suggest_threshold = 0.60
mock_scanner_instance.ml_enabled = True
mock_scanner_instance.advanced_ocr_enabled = True
mock_scanner.return_value = mock_scanner_instance
response = self.client.get("/api/ai/config/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("auto_apply_threshold", response.data)
self.assertEqual(response.data["auto_apply_threshold"], 0.80)
def test_update_config_success(self):
"""Test updating AI configuration."""
self.client.force_authenticate(user=self.superuser)
response = self.client.post(
"/api/ai/config/",
{
"auto_apply_threshold": 0.90,
"suggest_threshold": 0.70
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["status"], "success")
def test_update_config_invalid_threshold(self):
"""Test updating with invalid threshold value."""
self.client.force_authenticate(user=self.superuser)
response = self.client.post(
"/api/ai/config/",
{
"auto_apply_threshold": 1.5 # Invalid: > 1.0
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
class TestDeletionApprovalEndpoint(DirectoriesMixin, APITestCase):
"""Test the deletion approval endpoint."""
def setUp(self):
"""Set up test data."""
super().setUp()
# Create users
self.superuser = User.objects.create_superuser(
username="admin", email="admin@test.com", password="admin123"
)
self.user_with_permission = User.objects.create_user(
username="permitted", email="permitted@test.com", password="permitted123"
)
self.user_without_permission = User.objects.create_user(
username="regular", email="regular@test.com", password="regular123"
)
# Assign approval permission
content_type = ContentType.objects.get_for_model(Document)
approval_permission, _ = Permission.objects.get_or_create(
codename="can_approve_deletions",
name="Can approve AI-recommended deletions",
content_type=content_type,
)
self.user_with_permission.user_permissions.add(approval_permission)
# Create test deletion request
self.deletion_request = DeletionRequest.objects.create(
user=self.user_with_permission,
requested_by_ai=True,
ai_reason="Document appears to be a duplicate"
)
def test_unauthorized_access_denied(self):
"""Test that unauthenticated users are denied."""
response = self.client.post(
"/api/ai/deletions/approve/",
{
"request_id": self.deletion_request.id,
"action": "approve"
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_user_without_permission_denied(self):
"""Test that users without permission are denied."""
self.client.force_authenticate(user=self.user_without_permission)
response = self.client.post(
"/api/ai/deletions/approve/",
{
"request_id": self.deletion_request.id,
"action": "approve"
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_approve_deletion_success(self):
"""Test successfully approving a deletion request."""
self.client.force_authenticate(user=self.user_with_permission)
response = self.client.post(
"/api/ai/deletions/approve/",
{
"request_id": self.deletion_request.id,
"action": "approve"
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["status"], "success")
# Verify status was updated
self.deletion_request.refresh_from_db()
self.assertEqual(
self.deletion_request.status,
DeletionRequest.STATUS_APPROVED
)
def test_reject_deletion_success(self):
"""Test successfully rejecting a deletion request."""
self.client.force_authenticate(user=self.user_with_permission)
response = self.client.post(
"/api/ai/deletions/approve/",
{
"request_id": self.deletion_request.id,
"action": "reject",
"reason": "Document is still needed"
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Verify status was updated
self.deletion_request.refresh_from_db()
self.assertEqual(
self.deletion_request.status,
DeletionRequest.STATUS_REJECTED
)
def test_invalid_request_id(self):
"""Test handling of invalid deletion request ID."""
self.client.force_authenticate(user=self.superuser)
response = self.client.post(
"/api/ai/deletions/approve/",
{
"request_id": 99999,
"action": "approve"
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_superuser_can_approve_any_request(self):
"""Test that superusers can approve any deletion request."""
self.client.force_authenticate(user=self.superuser)
response = self.client.post(
"/api/ai/deletions/approve/",
{
"request_id": self.deletion_request.id,
"action": "approve"
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
class TestEndpointPermissionIntegration(DirectoriesMixin, APITestCase):
"""Test permission integration across all AI endpoints."""
def setUp(self):
"""Set up test data."""
super().setUp()
# Create user with all AI permissions
self.power_user = User.objects.create_user(
username="power_user", email="power@test.com", password="power123"
)
content_type = ContentType.objects.get_for_model(Document)
# Assign all AI permissions
permissions = [
"can_view_ai_suggestions",
"can_apply_ai_suggestions",
"can_approve_deletions",
"can_configure_ai",
]
for codename in permissions:
perm, _ = Permission.objects.get_or_create(
codename=codename,
name=f"Can {codename.replace('_', ' ')}",
content_type=content_type,
)
self.power_user.user_permissions.add(perm)
self.document = Document.objects.create(
title="Test Doc",
content="Test"
)
def test_power_user_can_access_all_endpoints(self):
"""Test that user with all permissions can access all endpoints."""
self.client.force_authenticate(user=self.power_user)
# Test suggestions endpoint
with mock.patch('documents.views.get_ai_scanner') as mock_scanner:
mock_scan_result = mock.MagicMock()
mock_scan_result.tags = []
mock_scan_result.correspondent = None
mock_scan_result.document_type = None
mock_scan_result.storage_path = None
mock_scan_result.title_suggestion = None
mock_scan_result.custom_fields = {}
mock_scanner_instance = mock.MagicMock()
mock_scanner_instance.scan_document.return_value = mock_scan_result
mock_scanner_instance.auto_apply_threshold = 0.80
mock_scanner_instance.suggest_threshold = 0.60
mock_scanner_instance.ml_enabled = True
mock_scanner_instance.advanced_ocr_enabled = True
mock_scanner.return_value = mock_scanner_instance
response1 = self.client.post(
"/api/ai/suggestions/",
{"document_id": self.document.id},
format="json"
)
self.assertEqual(response1.status_code, status.HTTP_200_OK)
# Test apply endpoint
response2 = self.client.post(
"/api/ai/suggestions/apply/",
{
"document_id": self.document.id,
"apply_tags": False
},
format="json"
)
self.assertEqual(response2.status_code, status.HTTP_200_OK)
# Test config endpoint
response3 = self.client.get("/api/ai/config/")
self.assertEqual(response3.status_code, status.HTTP_200_OK)

View file

@ -69,6 +69,7 @@ from packaging import version as packaging_version
from redis import Redis
from rest_framework import parsers
from rest_framework import serializers
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound
from rest_framework.exceptions import ValidationError
@ -127,6 +128,7 @@ from documents.matching import match_storage_paths
from documents.matching import match_tags
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import DeletionRequest
from documents.models import Document
from documents.models import DocumentType
from documents.models import Note
@ -139,9 +141,15 @@ from documents.models import UiSettings
from documents.models import Workflow
from documents.models import WorkflowAction
from documents.models import WorkflowTrigger
from documents.ai_scanner import AIDocumentScanner
from documents.ai_scanner import get_ai_scanner
from documents.parsers import get_parser_class_for_mime_type
from documents.parsers import parse_date_generator
from documents.permissions import AcknowledgeTasksPermissions
from documents.permissions import CanApplyAISuggestionsPermission
from documents.permissions import CanApproveDeletionsPermission
from documents.permissions import CanConfigureAIPermission
from documents.permissions import CanViewAISuggestionsPermission
from documents.permissions import PaperlessAdminPermissions
from documents.permissions import PaperlessNotePermissions
from documents.permissions import PaperlessObjectPermissions
@ -152,11 +160,16 @@ from documents.permissions import has_perms_owner_aware
from documents.permissions import set_permissions_for_object
from documents.schema import generate_object_with_permissions_schema
from documents.serialisers import AcknowledgeTasksViewSerializer
from documents.serialisers import AIConfigurationSerializer
from documents.serialisers import AISuggestionsRequestSerializer
from documents.serialisers import AISuggestionsResponseSerializer
from documents.serialisers import ApplyAISuggestionsSerializer
from documents.serialisers import BulkDownloadSerializer
from documents.serialisers import BulkEditObjectsSerializer
from documents.serialisers import BulkEditSerializer
from documents.serialisers import CorrespondentSerializer
from documents.serialisers import CustomFieldSerializer
from documents.serialisers import DeletionApprovalSerializer
from documents.serialisers import DocumentListSerializer
from documents.serialisers import DocumentSerializer
from documents.serialisers import DocumentTypeSerializer
@ -3150,3 +3163,339 @@ def serve_logo(request, filename=None):
filename=app_logo.name,
as_attachment=True,
)
class AISuggestionsView(GenericAPIView):
"""
API view to get AI suggestions for a document.
Requires: can_view_ai_suggestions permission
"""
permission_classes = [IsAuthenticated, CanViewAISuggestionsPermission]
serializer_class = AISuggestionsResponseSerializer
def post(self, request):
"""Get AI suggestions for a document."""
# Validate request
request_serializer = AISuggestionsRequestSerializer(data=request.data)
request_serializer.is_valid(raise_exception=True)
document_id = request_serializer.validated_data['document_id']
try:
document = Document.objects.get(pk=document_id)
except Document.DoesNotExist:
return Response(
{"error": "Document not found or you don't have permission to view it"},
status=status.HTTP_404_NOT_FOUND
)
# Check if user has permission to view this document
if not has_perms_owner_aware(request.user, 'documents.view_document', document):
return Response(
{"error": "Permission denied"},
status=status.HTTP_403_FORBIDDEN
)
# Get AI scanner and scan document
scanner = get_ai_scanner()
scan_result = scanner.scan_document(document, document.content or "")
# Build response
response_data = {
"document_id": document.id,
"tags": [],
"correspondent": None,
"document_type": None,
"storage_path": None,
"title_suggestion": scan_result.title_suggestion,
"custom_fields": {}
}
# Format tag suggestions
for tag_id, confidence in scan_result.tags:
try:
tag = Tag.objects.get(pk=tag_id)
response_data["tags"].append({
"id": tag.id,
"name": tag.name,
"confidence": confidence
})
except Tag.DoesNotExist:
# Tag was suggested by AI but no longer exists; skip it
pass
# Format correspondent suggestion
if scan_result.correspondent:
corr_id, confidence = scan_result.correspondent
try:
correspondent = Correspondent.objects.get(pk=corr_id)
response_data["correspondent"] = {
"id": correspondent.id,
"name": correspondent.name,
"confidence": confidence
}
except Correspondent.DoesNotExist:
# Correspondent was suggested but no longer exists; skip it
pass
# Format document type suggestion
if scan_result.document_type:
type_id, confidence = scan_result.document_type
try:
doc_type = DocumentType.objects.get(pk=type_id)
response_data["document_type"] = {
"id": doc_type.id,
"name": doc_type.name,
"confidence": confidence
}
except DocumentType.DoesNotExist:
# Document type was suggested but no longer exists; skip it
pass
# Format storage path suggestion
if scan_result.storage_path:
path_id, confidence = scan_result.storage_path
try:
storage_path = StoragePath.objects.get(pk=path_id)
response_data["storage_path"] = {
"id": storage_path.id,
"name": storage_path.name,
"confidence": confidence
}
except StoragePath.DoesNotExist:
# Storage path was suggested but no longer exists; skip it
pass
# Format custom fields
for field_id, (value, confidence) in scan_result.custom_fields.items():
response_data["custom_fields"][str(field_id)] = {
"value": value,
"confidence": confidence
}
return Response(response_data)
class ApplyAISuggestionsView(GenericAPIView):
"""
API view to apply AI suggestions to a document.
Requires: can_apply_ai_suggestions permission
"""
permission_classes = [IsAuthenticated, CanApplyAISuggestionsPermission]
def post(self, request):
"""Apply AI suggestions to a document."""
# Validate request
serializer = ApplyAISuggestionsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
document_id = serializer.validated_data['document_id']
try:
document = Document.objects.get(pk=document_id)
except Document.DoesNotExist:
return Response(
{"error": "Document not found"},
status=status.HTTP_404_NOT_FOUND
)
# Check if user has permission to change this document
if not has_perms_owner_aware(request.user, 'documents.change_document', document):
return Response(
{"error": "Permission denied"},
status=status.HTTP_403_FORBIDDEN
)
# Get AI scanner and scan document
scanner = get_ai_scanner()
scan_result = scanner.scan_document(document, document.content or "")
# Apply suggestions based on user selections
applied = []
if serializer.validated_data.get('apply_tags'):
selected_tags = serializer.validated_data.get('selected_tags', [])
if selected_tags:
# Apply only selected tags
tags_to_apply = [tag_id for tag_id, _ in scan_result.tags if tag_id in selected_tags]
else:
# Apply all high-confidence tags
tags_to_apply = [tag_id for tag_id, conf in scan_result.tags if conf >= scanner.auto_apply_threshold]
for tag_id in tags_to_apply:
try:
tag = Tag.objects.get(pk=tag_id)
document.add_nested_tags([tag])
applied.append(f"tag: {tag.name}")
except Tag.DoesNotExist:
# Tag not found; skip applying this tag
pass
if serializer.validated_data.get('apply_correspondent') and scan_result.correspondent:
corr_id, confidence = scan_result.correspondent
try:
correspondent = Correspondent.objects.get(pk=corr_id)
document.correspondent = correspondent
applied.append(f"correspondent: {correspondent.name}")
except Correspondent.DoesNotExist:
# Correspondent not found; skip applying
pass
if serializer.validated_data.get('apply_document_type') and scan_result.document_type:
type_id, confidence = scan_result.document_type
try:
doc_type = DocumentType.objects.get(pk=type_id)
document.document_type = doc_type
applied.append(f"document_type: {doc_type.name}")
except DocumentType.DoesNotExist:
# Document type not found; skip applying
pass
if serializer.validated_data.get('apply_storage_path') and scan_result.storage_path:
path_id, confidence = scan_result.storage_path
try:
storage_path = StoragePath.objects.get(pk=path_id)
document.storage_path = storage_path
applied.append(f"storage_path: {storage_path.name}")
except StoragePath.DoesNotExist:
# Storage path not found; skip applying
pass
if serializer.validated_data.get('apply_title') and scan_result.title_suggestion:
document.title = scan_result.title_suggestion
applied.append(f"title: {scan_result.title_suggestion}")
# Save document
document.save()
return Response({
"status": "success",
"document_id": document.id,
"applied": applied
})
class AIConfigurationView(GenericAPIView):
"""
API view to get/update AI configuration.
Requires: can_configure_ai permission
"""
permission_classes = [IsAuthenticated, CanConfigureAIPermission]
def get(self, request):
"""Get current AI configuration."""
scanner = get_ai_scanner()
config_data = {
"auto_apply_threshold": scanner.auto_apply_threshold,
"suggest_threshold": scanner.suggest_threshold,
"ml_enabled": scanner.ml_enabled,
"advanced_ocr_enabled": scanner.advanced_ocr_enabled,
}
serializer = AIConfigurationSerializer(config_data)
return Response(serializer.data)
def post(self, request):
"""
Update AI configuration.
Note: This updates the global scanner instance. Configuration changes
will take effect immediately but may require server restart in production
environments for consistency across workers.
"""
serializer = AIConfigurationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Create new scanner with updated configuration
config = {}
if 'auto_apply_threshold' in serializer.validated_data:
config['auto_apply_threshold'] = serializer.validated_data['auto_apply_threshold']
if 'suggest_threshold' in serializer.validated_data:
config['suggest_threshold'] = serializer.validated_data['suggest_threshold']
if 'ml_enabled' in serializer.validated_data:
config['enable_ml_features'] = serializer.validated_data['ml_enabled']
if 'advanced_ocr_enabled' in serializer.validated_data:
config['enable_advanced_ocr'] = serializer.validated_data['advanced_ocr_enabled']
# Update global scanner instance
# WARNING: Not thread-safe. Consider storing configuration in database
# and reloading on each get_ai_scanner() call for production use
from documents import ai_scanner
ai_scanner._scanner_instance = AIDocumentScanner(**config)
return Response({
"status": "success",
"message": "AI configuration updated. Changes may require server restart for consistency."
})
class DeletionApprovalView(GenericAPIView):
"""
API view to approve/reject deletion requests.
Requires: can_approve_deletions permission
"""
permission_classes = [IsAuthenticated, CanApproveDeletionsPermission]
def post(self, request):
"""Approve or reject a deletion request."""
serializer = DeletionApprovalSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
request_id = serializer.validated_data['request_id']
action = serializer.validated_data['action']
reason = serializer.validated_data.get('reason', '')
try:
deletion_request = DeletionRequest.objects.get(pk=request_id)
except DeletionRequest.DoesNotExist:
return Response(
{"error": "Deletion request not found"},
status=status.HTTP_404_NOT_FOUND
)
# Permission is handled by the permission class; users with the permission
# can approve any deletion request. Additional ownership check for non-superusers.
if deletion_request.user != request.user and not request.user.is_superuser:
return Response(
{"error": "Permission denied"},
status=status.HTTP_403_FORBIDDEN
)
if action == "approve":
deletion_request.status = DeletionRequest.STATUS_APPROVED
# TODO: Store approval reason for audit trail
# deletion_request.approval_reason = reason
# deletion_request.reviewed_at = timezone.now()
# deletion_request.reviewed_by = request.user
deletion_request.save()
# Perform the actual deletion
# This would integrate with the AI deletion manager
return Response({
"status": "success",
"message": "Deletion request approved",
"request_id": request_id
})
else: # action == "reject"
deletion_request.status = DeletionRequest.STATUS_REJECTED
# TODO: Store rejection reason for audit trail
# deletion_request.rejection_reason = reason
# deletion_request.reviewed_at = timezone.now()
# deletion_request.reviewed_by = request.user
deletion_request.save()
return Response({
"status": "success",
"message": "Deletion request rejected",
"request_id": request_id
})

View file

@ -15,11 +15,15 @@ from drf_spectacular.views import SpectacularAPIView
from drf_spectacular.views import SpectacularSwaggerView
from rest_framework.routers import DefaultRouter
from documents.views import AIConfigurationView
from documents.views import AISuggestionsView
from documents.views import ApplyAISuggestionsView
from documents.views import BulkDownloadView
from documents.views import BulkEditObjectsView
from documents.views import BulkEditView
from documents.views import CorrespondentViewSet
from documents.views import CustomFieldViewSet
from documents.views import DeletionApprovalView
from documents.views import DocumentTypeViewSet
from documents.views import GlobalSearchView
from documents.views import IndexView
@ -200,6 +204,33 @@ urlpatterns = [
TrashView.as_view(),
name="trash",
),
re_path(
"^ai/",
include(
[
re_path(
"^suggestions/$",
AISuggestionsView.as_view(),
name="ai_suggestions",
),
re_path(
"^suggestions/apply/$",
ApplyAISuggestionsView.as_view(),
name="ai_apply_suggestions",
),
re_path(
"^config/$",
AIConfigurationView.as_view(),
name="ai_config",
),
re_path(
"^deletions/approve/$",
DeletionApprovalView.as_view(),
name="ai_deletion_approval",
),
],
),
),
re_path(
r"^oauth/callback/",
OauthCallbackView.as_view(),