diff --git a/src/documents/ai_deletion_manager.py b/src/documents/ai_deletion_manager.py index 9730831b9..21848b813 100644 --- a/src/documents/ai_deletion_manager.py +++ b/src/documents/ai_deletion_manager.py @@ -14,15 +14,9 @@ According to agents.md requirements: from __future__ import annotations import logging -from datetime import datetime -from typing import TYPE_CHECKING, Dict, List, Optional, Any +from typing import Any -from django.conf import settings from django.contrib.auth.models import User -from django.utils import timezone - -if TYPE_CHECKING: - from documents.models import Document, DeletionRequest logger = logging.getLogger("paperless.ai_deletion") @@ -30,35 +24,35 @@ logger = logging.getLogger("paperless.ai_deletion") class AIDeletionManager: """ Manager for AI-initiated deletion requests. - + Ensures all deletions go through proper user approval workflow. """ - + @staticmethod def create_deletion_request( - documents: List, + documents: list, reason: str, user: User, - impact_analysis: Optional[Dict[str, Any]] = None, + impact_analysis: dict[str, Any] | None = None, ): """ Create a new deletion request that requires user approval. - + Args: documents: List of documents to be deleted reason: Detailed explanation from AI user: User who must approve impact_analysis: Optional detailed impact analysis - + Returns: Created DeletionRequest instance """ from documents.models import DeletionRequest - + # Analyze impact if not provided if impact_analysis is None: impact_analysis = AIDeletionManager._analyze_impact(documents) - + # Create request request = DeletionRequest.objects.create( requested_by_ai=True, @@ -67,25 +61,25 @@ class AIDeletionManager: status=DeletionRequest.STATUS_PENDING, impact_summary=impact_analysis, ) - + # Add documents request.documents.set(documents) - + logger.info( f"Created deletion request {request.id} for {len(documents)} documents " - f"requiring approval from user {user.username}" + f"requiring approval from user {user.username}", ) - + # TODO: Send notification to user about pending deletion request # This could be via email, in-app notification, or both - + return request - + @staticmethod - def _analyze_impact(documents: List) -> Dict[str, Any]: + def _analyze_impact(documents: list) -> dict[str, Any]: """ Analyze the impact of deleting the given documents. - + Returns comprehensive information about what will be affected. """ impact = { @@ -100,7 +94,7 @@ class AIDeletionManager: "latest": None, }, } - + for doc in documents: # Document details doc_info = { @@ -112,77 +106,85 @@ class AIDeletionManager: "tags": [tag.name for tag in doc.tags.all()], } impact["documents"].append(doc_info) - + # Track size (if available) # Note: This would need actual file size tracking - + # Track affected metadata if doc.correspondent: impact["affected_correspondents"].add(doc.correspondent.name) - + if doc.document_type: impact["affected_types"].add(doc.document_type.name) - + for tag in doc.tags.all(): impact["affected_tags"].add(tag.name) - + # Track date range if doc.created: - if impact["date_range"]["earliest"] is None or doc.created < impact["date_range"]["earliest"]: + if ( + impact["date_range"]["earliest"] is None + or doc.created < impact["date_range"]["earliest"] + ): impact["date_range"]["earliest"] = doc.created - - if impact["date_range"]["latest"] is None or doc.created > impact["date_range"]["latest"]: + + if ( + impact["date_range"]["latest"] is None + or doc.created > impact["date_range"]["latest"] + ): impact["date_range"]["latest"] = doc.created - + # Convert sets to lists for JSON serialization impact["affected_tags"] = list(impact["affected_tags"]) impact["affected_correspondents"] = list(impact["affected_correspondents"]) impact["affected_types"] = list(impact["affected_types"]) - + # Convert dates to ISO format if impact["date_range"]["earliest"]: - impact["date_range"]["earliest"] = impact["date_range"]["earliest"].isoformat() + impact["date_range"]["earliest"] = impact["date_range"][ + "earliest" + ].isoformat() if impact["date_range"]["latest"]: impact["date_range"]["latest"] = impact["date_range"]["latest"].isoformat() - + return impact - + @staticmethod - def get_pending_requests(user: User) -> List: + def get_pending_requests(user: User) -> list: """ Get all pending deletion requests for a user. - + Args: user: User to get requests for - + Returns: List of pending DeletionRequest instances """ from documents.models import DeletionRequest - + return list( DeletionRequest.objects.filter( user=user, status=DeletionRequest.STATUS_PENDING, - ) + ), ) - + @staticmethod def format_deletion_request_for_user(request) -> str: """ Format a deletion request into a human-readable message. - + This provides comprehensive information to the user about what will be deleted, as required by agents.md. - + Args: request: DeletionRequest to format - + Returns: Formatted message string """ impact = request.impact_summary - + message = f""" =========================================== AI DELETION REQUEST #{request.id} @@ -192,27 +194,27 @@ REASON: {request.ai_reason} IMPACT SUMMARY: -- Number of documents: {impact.get('document_count', 0)} -- Affected tags: {', '.join(impact.get('affected_tags', [])) or 'None'} -- Affected correspondents: {', '.join(impact.get('affected_correspondents', [])) or 'None'} -- Affected document types: {', '.join(impact.get('affected_types', [])) or 'None'} +- Number of documents: {impact.get("document_count", 0)} +- Affected tags: {", ".join(impact.get("affected_tags", [])) or "None"} +- Affected correspondents: {", ".join(impact.get("affected_correspondents", [])) or "None"} +- Affected document types: {", ".join(impact.get("affected_types", [])) or "None"} DATE RANGE: -- Earliest: {impact.get('date_range', {}).get('earliest', 'Unknown')} -- Latest: {impact.get('date_range', {}).get('latest', 'Unknown')} +- Earliest: {impact.get("date_range", {}).get("earliest", "Unknown")} +- Latest: {impact.get("date_range", {}).get("latest", "Unknown")} DOCUMENTS TO BE DELETED: """ - - for i, doc in enumerate(impact.get('documents', []), 1): + + for i, doc in enumerate(impact.get("documents", []), 1): message += f""" -{i}. ID: {doc['id']} - {doc['title']} - Created: {doc['created']} - Correspondent: {doc['correspondent'] or 'None'} - Type: {doc['document_type'] or 'None'} - Tags: {', '.join(doc['tags']) or 'None'} +{i}. ID: {doc["id"]} - {doc["title"]} + Created: {doc["created"]} + Correspondent: {doc["correspondent"] or "None"} + Type: {doc["document_type"] or "None"} + Tags: {", ".join(doc["tags"]) or "None"} """ - + message += """ =========================================== @@ -223,21 +225,21 @@ No files will be deleted until you confirm this action. Please review the above information carefully before approving or rejecting this request. """ - + return message - + @staticmethod def can_ai_delete_automatically() -> bool: """ Check if AI is allowed to delete automatically. - + According to agents.md, AI should NEVER delete without user approval. This method always returns False as a safety measure. - + Returns: Always False - AI cannot auto-delete """ return False -__all__ = ['AIDeletionManager'] +__all__ = ["AIDeletionManager"] diff --git a/src/documents/ai_scanner.py b/src/documents/ai_scanner.py index c7fe254e1..36cdf2437 100644 --- a/src/documents/ai_scanner.py +++ b/src/documents/ai_scanner.py @@ -20,21 +20,16 @@ According to agents.md requirements: from __future__ import annotations import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Any, Tuple +from typing import TYPE_CHECKING +from typing import Any from django.conf import settings from django.db import transaction if TYPE_CHECKING: - from documents.models import ( - Document, - Tag, - Correspondent, - DocumentType, - StoragePath, - CustomField, - Workflow, - ) + from documents.models import CustomField + from documents.models import Document + from documents.models import Workflow logger = logging.getLogger("paperless.ai_scanner") @@ -45,17 +40,25 @@ class AIScanResult: """ def __init__(self): - self.tags: List[Tuple[int, float]] = [] # [(tag_id, confidence), ...] - self.correspondent: Optional[Tuple[int, float]] = None # (correspondent_id, confidence) - self.document_type: Optional[Tuple[int, float]] = None # (document_type_id, confidence) - self.storage_path: Optional[Tuple[int, float]] = None # (storage_path_id, confidence) - self.custom_fields: Dict[int, Tuple[Any, float]] = {} # {field_id: (value, confidence), ...} - self.workflows: List[Tuple[int, float]] = [] # [(workflow_id, confidence), ...] - self.extracted_entities: Dict[str, Any] = {} # NER results - self.title_suggestion: Optional[str] = None - self.metadata: Dict[str, Any] = {} # Additional metadata + self.tags: list[tuple[int, float]] = [] # [(tag_id, confidence), ...] + self.correspondent: tuple[int, float] | None = ( + None # (correspondent_id, confidence) + ) + self.document_type: tuple[int, float] | None = ( + None # (document_type_id, confidence) + ) + self.storage_path: tuple[int, float] | None = ( + None # (storage_path_id, confidence) + ) + self.custom_fields: dict[ + int, tuple[Any, float], + ] = {} # {field_id: (value, confidence), ...} + self.workflows: list[tuple[int, float]] = [] # [(workflow_id, confidence), ...] + self.extracted_entities: dict[str, Any] = {} # NER results + self.title_suggestion: str | None = None + self.metadata: dict[str, Any] = {} # Additional metadata - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Convert scan results to dictionary for logging/serialization.""" return { "tags": self.tags, @@ -73,7 +76,7 @@ class AIScanResult: class AIDocumentScanner: """ Comprehensive AI scanner for automatic document metadata management. - + This scanner integrates all ML/AI capabilities to provide automatic: - Tag assignment based on content analysis - Correspondent detection from document text @@ -81,7 +84,7 @@ class AIDocumentScanner: - Storage path suggestion based on content/type - Custom field extraction using NER - Workflow assignment based on document characteristics - + Features: - High confidence threshold (>80%) for automatic application - Medium confidence (60-80%) for suggestions requiring user review @@ -99,7 +102,7 @@ class AIDocumentScanner: ): """ Initialize AI scanner. - + Args: auto_apply_threshold: Confidence threshold for automatic application (default: 0.80) suggest_threshold: Confidence threshold for suggestions (default: 0.60) @@ -108,7 +111,7 @@ class AIDocumentScanner: """ self.auto_apply_threshold = auto_apply_threshold self.suggest_threshold = suggest_threshold - + # Check settings for ML/OCR enablement self.ml_enabled = ( enable_ml_features @@ -120,16 +123,16 @@ class AIDocumentScanner: if enable_advanced_ocr is not None else getattr(settings, "PAPERLESS_ENABLE_ADVANCED_OCR", True) ) - + # Lazy loading of ML components self._classifier = None self._ner_extractor = None self._semantic_search = None self._table_extractor = None - + logger.info( f"AIDocumentScanner initialized - ML: {self.ml_enabled}, " - f"Advanced OCR: {self.advanced_ocr_enabled}" + f"Advanced OCR: {self.advanced_ocr_enabled}", ) def _get_classifier(self): @@ -137,6 +140,7 @@ class AIDocumentScanner: if self._classifier is None and self.ml_enabled: try: from documents.ml.classifier import TransformerDocumentClassifier + self._classifier = TransformerDocumentClassifier() logger.info("ML classifier loaded successfully") except Exception as e: @@ -149,6 +153,7 @@ class AIDocumentScanner: if self._ner_extractor is None and self.ml_enabled: try: from documents.ml.ner import DocumentNER + self._ner_extractor = DocumentNER() logger.info("NER extractor loaded successfully") except Exception as e: @@ -160,6 +165,7 @@ class AIDocumentScanner: if self._semantic_search is None and self.ml_enabled: try: from documents.ml.semantic_search import SemanticSearch + self._semantic_search = SemanticSearch() logger.info("Semantic search loaded successfully") except Exception as e: @@ -171,6 +177,7 @@ class AIDocumentScanner: if self._table_extractor is None and self.advanced_ocr_enabled: try: from documents.ocr.table_extractor import TableExtractor + self._table_extractor = TableExtractor() logger.info("Table extractor loaded successfully") except Exception as e: @@ -185,90 +192,108 @@ class AIDocumentScanner: ) -> AIScanResult: """ Perform comprehensive AI scan of a document. - + This is the main entry point for document scanning. It orchestrates all AI/ML components to analyze the document and generate suggestions. - + Args: document: The Document model instance document_text: The extracted text content original_file_path: Path to original file (for OCR/image analysis) - + Returns: AIScanResult containing all suggestions and extracted data """ - logger.info(f"Starting AI scan for document: {document.title} (ID: {document.pk})") - + logger.info( + f"Starting AI scan for document: {document.title} (ID: {document.pk})", + ) + result = AIScanResult() - + # Extract entities using NER result.extracted_entities = self._extract_entities(document_text) - + # Analyze and suggest tags - result.tags = self._suggest_tags(document, document_text, result.extracted_entities) - + result.tags = self._suggest_tags( + document, document_text, result.extracted_entities, + ) + # Detect correspondent result.correspondent = self._detect_correspondent( - document, document_text, result.extracted_entities + document, + document_text, + result.extracted_entities, ) - + # Classify document type result.document_type = self._classify_document_type( - document, document_text, result.extracted_entities + document, + document_text, + result.extracted_entities, ) - + # Suggest storage path result.storage_path = self._suggest_storage_path( - document, document_text, result + document, + document_text, + result, ) - + # Extract custom fields result.custom_fields = self._extract_custom_fields( - document, document_text, result.extracted_entities + document, + document_text, + result.extracted_entities, ) - + # Suggest workflows result.workflows = self._suggest_workflows(document, document_text, result) - + # Generate improved title suggestion result.title_suggestion = self._suggest_title( - document, document_text, result.extracted_entities + document, + document_text, + result.extracted_entities, ) - + # Extract tables if advanced OCR enabled if self.advanced_ocr_enabled and original_file_path: result.metadata["tables"] = self._extract_tables(original_file_path) - + logger.info(f"AI scan completed for document {document.pk}") logger.debug(f"Scan results: {result.to_dict()}") - + return result - def _extract_entities(self, text: str) -> Dict[str, Any]: + def _extract_entities(self, text: str) -> dict[str, Any]: """ Extract named entities from document text using NER. - + Returns: Dictionary with extracted entities (persons, orgs, dates, amounts, etc.) """ ner = self._get_ner_extractor() if not ner: return {} - + try: # Use extract_all to get comprehensive entity extraction entities = ner.extract_all(text) - + # Convert string lists to dict format for consistency for key in ["persons", "organizations", "locations", "misc"]: if key in entities and isinstance(entities[key], list): - entities[key] = [{"text": e} if isinstance(e, str) else e for e in entities[key]] - + entities[key] = [ + {"text": e} if isinstance(e, str) else e for e in entities[key] + ] + for key in ["dates", "amounts"]: if key in entities and isinstance(entities[key], list): - entities[key] = [{"text": e} if isinstance(e, str) else e for e in entities[key]] - - logger.debug(f"Extracted entities from NER") + entities[key] = [ + {"text": e} if isinstance(e, str) else e for e in entities[key] + ] + + logger.debug("Extracted entities from NER") return entities except Exception as e: logger.error(f"Entity extraction failed: {e}", exc_info=True) @@ -278,156 +303,157 @@ class AIDocumentScanner: self, document: Document, text: str, - entities: Dict[str, Any], - ) -> List[Tuple[int, float]]: + entities: dict[str, Any], + ) -> list[tuple[int, float]]: """ Suggest relevant tags based on document content and entities. - + Uses a combination of: - Keyword matching with existing tag patterns - ML classification if available - Entity-based suggestions (e.g., organization -> company tag) - + Returns: List of (tag_id, confidence) tuples """ - from documents.models import Tag from documents.matching import match_tags - + from documents.models import Tag + suggestions = [] - + try: # Use existing matching logic matched_tags = match_tags(document, self._get_classifier()) - + # Add confidence scores based on matching strength for tag in matched_tags: confidence = 0.85 # High confidence for matched tags suggestions.append((tag.id, confidence)) - + # Additional entity-based suggestions if entities: # Suggest tags based on detected entities all_tags = Tag.objects.all() - + # Check for organization entities -> company/business tags if entities.get("organizations"): for tag in all_tags.filter(name__icontains="company"): suggestions.append((tag.id, 0.70)) - + # Check for date entities -> tax/financial tags if year-end if entities.get("dates"): for tag in all_tags.filter(name__icontains="tax"): suggestions.append((tag.id, 0.65)) - + # Remove duplicates, keep highest confidence seen = {} for tag_id, conf in suggestions: if tag_id not in seen or conf > seen[tag_id]: seen[tag_id] = conf - + suggestions = [(tid, conf) for tid, conf in seen.items()] suggestions.sort(key=lambda x: x[1], reverse=True) - + logger.debug(f"Suggested {len(suggestions)} tags") - + except Exception as e: logger.error(f"Tag suggestion failed: {e}", exc_info=True) - + return suggestions def _detect_correspondent( self, document: Document, text: str, - entities: Dict[str, Any], - ) -> Optional[Tuple[int, float]]: + entities: dict[str, Any], + ) -> tuple[int, float] | None: """ Detect correspondent based on document content and entities. - + Uses: - Organization entities from NER - Email domains - Existing correspondent matching patterns - + Returns: (correspondent_id, confidence) or None """ - from documents.models import Correspondent from documents.matching import match_correspondents - + from documents.models import Correspondent + try: # Use existing matching logic - matched_correspondents = match_correspondents(document, self._get_classifier()) - + matched_correspondents = match_correspondents( + document, self._get_classifier(), + ) + if matched_correspondents: correspondent = matched_correspondents[0] confidence = 0.85 logger.debug( f"Detected correspondent: {correspondent.name} " - f"(confidence: {confidence})" + f"(confidence: {confidence})", ) return (correspondent.id, confidence) - + # Try to match based on NER organizations if entities.get("organizations"): org_name = entities["organizations"][0]["text"] # Try to find existing correspondent with similar name correspondents = Correspondent.objects.filter( - name__icontains=org_name[:20] # First 20 chars + name__icontains=org_name[:20], # First 20 chars ) if correspondents.exists(): correspondent = correspondents.first() confidence = 0.70 logger.debug( f"Detected correspondent from NER: {correspondent.name} " - f"(confidence: {confidence})" + f"(confidence: {confidence})", ) return (correspondent.id, confidence) - + except Exception as e: logger.error(f"Correspondent detection failed: {e}", exc_info=True) - + return None def _classify_document_type( self, document: Document, text: str, - entities: Dict[str, Any], - ) -> Optional[Tuple[int, float]]: + entities: dict[str, Any], + ) -> tuple[int, float] | None: """ Classify document type using ML and content analysis. - + Returns: (document_type_id, confidence) or None """ - from documents.models import DocumentType from documents.matching import match_document_types - + try: # Use existing matching logic matched_types = match_document_types(document, self._get_classifier()) - + if matched_types: doc_type = matched_types[0] confidence = 0.85 logger.debug( f"Classified document type: {doc_type.name} " - f"(confidence: {confidence})" + f"(confidence: {confidence})", ) return (doc_type.id, confidence) - + # ML-based classification if available classifier = self._get_classifier() if classifier and hasattr(classifier, "predict"): # This would need a trained model with document type labels # For now, fall back to pattern matching pass - + except Exception as e: logger.error(f"Document type classification failed: {e}", exc_info=True) - + return None def _suggest_storage_path( @@ -435,127 +461,131 @@ class AIDocumentScanner: document: Document, text: str, scan_result: AIScanResult, - ) -> Optional[Tuple[int, float]]: + ) -> tuple[int, float] | None: """ Suggest appropriate storage path based on document characteristics. - + Returns: (storage_path_id, confidence) or None """ - from documents.models import StoragePath from documents.matching import match_storage_paths - + try: # Use existing matching logic matched_paths = match_storage_paths(document, self._get_classifier()) - + if matched_paths: storage_path = matched_paths[0] confidence = 0.80 logger.debug( f"Suggested storage path: {storage_path.name} " - f"(confidence: {confidence})" + f"(confidence: {confidence})", ) return (storage_path.id, confidence) - + except Exception as e: logger.error(f"Storage path suggestion failed: {e}", exc_info=True) - + return None def _extract_custom_fields( self, document: Document, text: str, - entities: Dict[str, Any], - ) -> Dict[int, Tuple[Any, float]]: + entities: dict[str, Any], + ) -> dict[int, tuple[Any, float]]: """ Extract values for custom fields using NER and pattern matching. - + Returns: Dictionary mapping field_id to (value, confidence) """ from documents.models import CustomField - + extracted_fields = {} - + try: custom_fields = CustomField.objects.all() - + for field in custom_fields: # Try to extract field value based on field name and type value, confidence = self._extract_field_value( - field, text, entities + field, + text, + entities, ) - + if value is not None and confidence >= self.suggest_threshold: extracted_fields[field.id] = (value, confidence) logger.debug( f"Extracted custom field '{field.name}': {value} " - f"(confidence: {confidence})" + f"(confidence: {confidence})", ) - + except Exception as e: logger.error(f"Custom field extraction failed: {e}", exc_info=True) - + return extracted_fields def _extract_field_value( self, field: CustomField, text: str, - entities: Dict[str, Any], - ) -> Tuple[Any, float]: + entities: dict[str, Any], + ) -> tuple[Any, float]: """ Extract a single custom field value. - + Returns: (value, confidence) tuple """ field_name_lower = field.name.lower() - + # Date fields if "date" in field_name_lower: dates = entities.get("dates", []) if dates: return (dates[0]["text"], 0.75) - + # Amount/price fields - if any(keyword in field_name_lower for keyword in ["amount", "price", "cost", "total"]): + if any( + keyword in field_name_lower + for keyword in ["amount", "price", "cost", "total"] + ): amounts = entities.get("amounts", []) if amounts: return (amounts[0]["text"], 0.75) - + # Invoice number fields if "invoice" in field_name_lower: invoice_numbers = entities.get("invoice_numbers", []) if invoice_numbers: return (invoice_numbers[0], 0.80) - + # Email fields if "email" in field_name_lower: emails = entities.get("emails", []) if emails: return (emails[0], 0.85) - + # Phone fields if "phone" in field_name_lower: phones = entities.get("phones", []) if phones: return (phones[0], 0.85) - + # Person name fields if "name" in field_name_lower or "person" in field_name_lower: persons = entities.get("persons", []) if persons: return (persons[0]["text"], 0.70) - + # Organization fields if "company" in field_name_lower or "organization" in field_name_lower: orgs = entities.get("organizations", []) if orgs: return (orgs[0]["text"], 0.70) - + return (None, 0.0) def _suggest_workflows( @@ -563,40 +593,43 @@ class AIDocumentScanner: document: Document, text: str, scan_result: AIScanResult, - ) -> List[Tuple[int, float]]: + ) -> list[tuple[int, float]]: """ Suggest relevant workflows based on document characteristics. - + Returns: List of (workflow_id, confidence) tuples """ - from documents.models import Workflow, WorkflowTrigger - + from documents.models import Workflow + from documents.models import WorkflowTrigger + suggestions = [] - + try: # Get all workflows with consumption triggers workflows = Workflow.objects.filter( enabled=True, triggers__type=WorkflowTrigger.WorkflowTriggerType.CONSUMPTION, ).distinct() - + for workflow in workflows: # Evaluate workflow conditions against scan results confidence = self._evaluate_workflow_match( - workflow, document, scan_result + workflow, + document, + scan_result, ) - + if confidence >= self.suggest_threshold: suggestions.append((workflow.id, confidence)) logger.debug( f"Suggested workflow: {workflow.name} " - f"(confidence: {confidence})" + f"(confidence: {confidence})", ) - + except Exception as e: logger.error(f"Workflow suggestion failed: {e}", exc_info=True) - + return suggestions def _evaluate_workflow_match( @@ -607,80 +640,80 @@ class AIDocumentScanner: ) -> float: """ Evaluate how well a workflow matches the document. - + Returns: Confidence score (0.0 to 1.0) """ # This is a simplified evaluation # In practice, you'd check workflow triggers and conditions - + confidence = 0.5 # Base confidence - + # Increase confidence if document type matches workflow expectations if scan_result.document_type and workflow.actions.exists(): confidence += 0.2 - + # Increase confidence if correspondent matches if scan_result.correspondent: confidence += 0.15 - + # Increase confidence if tags match if scan_result.tags: confidence += 0.15 - + return min(confidence, 1.0) def _suggest_title( self, document: Document, text: str, - entities: Dict[str, Any], - ) -> Optional[str]: + entities: dict[str, Any], + ) -> str | None: """ Generate an improved title suggestion based on document content. - + Returns: Suggested title or None """ try: # Extract key information for title title_parts = [] - + # Add document type if detected if entities.get("document_type"): title_parts.append(entities["document_type"]) - + # Add primary organization orgs = entities.get("organizations", []) if orgs: title_parts.append(orgs[0]["text"][:30]) # Limit length - + # Add date if available dates = entities.get("dates", []) if dates: title_parts.append(dates[0]["text"]) - + if title_parts: suggested_title = " - ".join(title_parts) logger.debug(f"Generated title suggestion: {suggested_title}") return suggested_title[:127] # Respect title length limit - + except Exception as e: logger.error(f"Title suggestion failed: {e}", exc_info=True) - + return None - def _extract_tables(self, file_path: str) -> List[Dict[str, Any]]: + def _extract_tables(self, file_path: str) -> list[dict[str, Any]]: """ Extract tables from document using advanced OCR. - + Returns: List of extracted tables with data and metadata """ extractor = self._get_table_extractor() if not extractor: return [] - + try: tables = extractor.extract_tables_from_image(file_path) logger.debug(f"Extracted {len(tables)} tables from document") @@ -695,21 +728,24 @@ class AIDocumentScanner: scan_result: AIScanResult, auto_apply: bool = True, user_confirmed: bool = False, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """ Apply AI scan results to document. - + Args: document: Document to update scan_result: AI scan results auto_apply: Whether to auto-apply high confidence suggestions user_confirmed: Whether user has confirmed low-confidence changes - + Returns: Dictionary with applied changes and pending suggestions """ - from documents.models import Tag, Correspondent, DocumentType, StoragePath - + from documents.models import Correspondent + from documents.models import DocumentType + from documents.models import StoragePath + from documents.models import Tag + applied = { "tags": [], "correspondent": None, @@ -717,7 +753,7 @@ class AIDocumentScanner: "storage_path": None, "custom_fields": {}, } - + suggestions = { "tags": [], "correspondent": None, @@ -725,7 +761,7 @@ class AIDocumentScanner: "storage_path": None, "custom_fields": {}, } - + try: with transaction.atomic(): # Apply tags @@ -737,12 +773,14 @@ class AIDocumentScanner: logger.info(f"Auto-applied tag: {tag.name}") elif confidence >= self.suggest_threshold: tag = Tag.objects.get(pk=tag_id) - suggestions["tags"].append({ - "id": tag_id, - "name": tag.name, - "confidence": confidence, - }) - + suggestions["tags"].append( + { + "id": tag_id, + "name": tag.name, + "confidence": confidence, + }, + ) + # Apply correspondent if scan_result.correspondent: corr_id, confidence = scan_result.correspondent @@ -761,7 +799,7 @@ class AIDocumentScanner: "name": correspondent.name, "confidence": confidence, } - + # Apply document type if scan_result.document_type: type_id, confidence = scan_result.document_type @@ -780,7 +818,7 @@ class AIDocumentScanner: "name": doc_type.name, "confidence": confidence, } - + # Apply storage path if scan_result.storage_path: path_id, confidence = scan_result.storage_path @@ -799,13 +837,13 @@ class AIDocumentScanner: "name": storage_path.name, "confidence": confidence, } - + # Save document with changes document.save() - + except Exception as e: logger.error(f"Failed to apply scan results: {e}", exc_info=True) - + return { "applied": applied, "suggestions": suggestions, @@ -819,7 +857,7 @@ _scanner_instance = None def get_ai_scanner() -> AIDocumentScanner: """ Get or create the global AI scanner instance. - + Returns: AIDocumentScanner instance """ diff --git a/src/documents/consumer.py b/src/documents/consumer.py index aea94a6fe..6f45b62a5 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -756,22 +756,22 @@ class ConsumerPlugin( def _run_ai_scanner(self, document, text): """ Run AI scanner on the document to automatically detect and apply metadata. - + This is called during document consumption to leverage AI/ML capabilities for automatic metadata management as specified in agents.md. - + Args: document: The Document model instance text: The extracted document text """ try: from documents.ai_scanner import get_ai_scanner - + scanner = get_ai_scanner() - + # Get the original file path if available original_file_path = str(self.working_copy) if self.working_copy else None - + # Perform comprehensive AI scan self.log.info(f"Running AI scanner on document: {document.title}") scan_result = scanner.scan_document( @@ -779,65 +779,65 @@ class ConsumerPlugin( document_text=text, original_file_path=original_file_path, ) - + # Apply scan results (auto-apply high confidence, suggest medium confidence) results = scanner.apply_scan_results( document=document, scan_result=scan_result, auto_apply=True, # Auto-apply high confidence suggestions ) - + # Log what was applied and suggested if results["applied"]["tags"]: self.log.info( - f"AI auto-applied tags: {[t['name'] for t in results['applied']['tags']]}" + f"AI auto-applied tags: {[t['name'] for t in results['applied']['tags']]}", ) - + if results["applied"]["correspondent"]: self.log.info( - f"AI auto-applied correspondent: {results['applied']['correspondent']['name']}" + f"AI auto-applied correspondent: {results['applied']['correspondent']['name']}", ) - + if results["applied"]["document_type"]: self.log.info( - f"AI auto-applied document type: {results['applied']['document_type']['name']}" + f"AI auto-applied document type: {results['applied']['document_type']['name']}", ) - + if results["applied"]["storage_path"]: self.log.info( - f"AI auto-applied storage path: {results['applied']['storage_path']['name']}" + f"AI auto-applied storage path: {results['applied']['storage_path']['name']}", ) - + # Log suggestions for user review if results["suggestions"]["tags"]: self.log.info( f"AI suggested tags (require review): " - f"{[t['name'] for t in results['suggestions']['tags']]}" + f"{[t['name'] for t in results['suggestions']['tags']]}", ) - + if results["suggestions"]["correspondent"]: self.log.info( f"AI suggested correspondent (requires review): " - f"{results['suggestions']['correspondent']['name']}" + f"{results['suggestions']['correspondent']['name']}", ) - + if results["suggestions"]["document_type"]: self.log.info( f"AI suggested document type (requires review): " - f"{results['suggestions']['document_type']['name']}" + f"{results['suggestions']['document_type']['name']}", ) - + if results["suggestions"]["storage_path"]: self.log.info( f"AI suggested storage path (requires review): " - f"{results['suggestions']['storage_path']['name']}" + f"{results['suggestions']['storage_path']['name']}", ) - + # Store suggestions in document metadata for UI to display # This allows the frontend to show AI suggestions to users - if not hasattr(document, '_ai_suggestions'): + if not hasattr(document, "_ai_suggestions"): document._ai_suggestions = results["suggestions"] - + except ImportError: # AI scanner not available, skip self.log.debug("AI scanner not available, skipping AI analysis")