Initial exploration: AI Scanner linting and pre-commit hooks

Co-authored-by: dawnsystem <42047891+dawnsystem@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot] 2025-11-12 13:09:03 +00:00
parent 496a9e7b7b
commit 2d7345f0bc
3 changed files with 308 additions and 268 deletions

View file

@ -14,15 +14,9 @@ According to agents.md requirements:
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Any
from typing import Any
from django.conf import settings
from django.contrib.auth.models import User
from django.utils import timezone
if TYPE_CHECKING:
from documents.models import Document, DeletionRequest
logger = logging.getLogger("paperless.ai_deletion")
@ -30,35 +24,35 @@ logger = logging.getLogger("paperless.ai_deletion")
class AIDeletionManager:
"""
Manager for AI-initiated deletion requests.
Ensures all deletions go through proper user approval workflow.
"""
@staticmethod
def create_deletion_request(
documents: List,
documents: list,
reason: str,
user: User,
impact_analysis: Optional[Dict[str, Any]] = None,
impact_analysis: dict[str, Any] | None = None,
):
"""
Create a new deletion request that requires user approval.
Args:
documents: List of documents to be deleted
reason: Detailed explanation from AI
user: User who must approve
impact_analysis: Optional detailed impact analysis
Returns:
Created DeletionRequest instance
"""
from documents.models import DeletionRequest
# Analyze impact if not provided
if impact_analysis is None:
impact_analysis = AIDeletionManager._analyze_impact(documents)
# Create request
request = DeletionRequest.objects.create(
requested_by_ai=True,
@ -67,25 +61,25 @@ class AIDeletionManager:
status=DeletionRequest.STATUS_PENDING,
impact_summary=impact_analysis,
)
# Add documents
request.documents.set(documents)
logger.info(
f"Created deletion request {request.id} for {len(documents)} documents "
f"requiring approval from user {user.username}"
f"requiring approval from user {user.username}",
)
# TODO: Send notification to user about pending deletion request
# This could be via email, in-app notification, or both
return request
@staticmethod
def _analyze_impact(documents: List) -> Dict[str, Any]:
def _analyze_impact(documents: list) -> dict[str, Any]:
"""
Analyze the impact of deleting the given documents.
Returns comprehensive information about what will be affected.
"""
impact = {
@ -100,7 +94,7 @@ class AIDeletionManager:
"latest": None,
},
}
for doc in documents:
# Document details
doc_info = {
@ -112,77 +106,85 @@ class AIDeletionManager:
"tags": [tag.name for tag in doc.tags.all()],
}
impact["documents"].append(doc_info)
# Track size (if available)
# Note: This would need actual file size tracking
# Track affected metadata
if doc.correspondent:
impact["affected_correspondents"].add(doc.correspondent.name)
if doc.document_type:
impact["affected_types"].add(doc.document_type.name)
for tag in doc.tags.all():
impact["affected_tags"].add(tag.name)
# Track date range
if doc.created:
if impact["date_range"]["earliest"] is None or doc.created < impact["date_range"]["earliest"]:
if (
impact["date_range"]["earliest"] is None
or doc.created < impact["date_range"]["earliest"]
):
impact["date_range"]["earliest"] = doc.created
if impact["date_range"]["latest"] is None or doc.created > impact["date_range"]["latest"]:
if (
impact["date_range"]["latest"] is None
or doc.created > impact["date_range"]["latest"]
):
impact["date_range"]["latest"] = doc.created
# Convert sets to lists for JSON serialization
impact["affected_tags"] = list(impact["affected_tags"])
impact["affected_correspondents"] = list(impact["affected_correspondents"])
impact["affected_types"] = list(impact["affected_types"])
# Convert dates to ISO format
if impact["date_range"]["earliest"]:
impact["date_range"]["earliest"] = impact["date_range"]["earliest"].isoformat()
impact["date_range"]["earliest"] = impact["date_range"][
"earliest"
].isoformat()
if impact["date_range"]["latest"]:
impact["date_range"]["latest"] = impact["date_range"]["latest"].isoformat()
return impact
@staticmethod
def get_pending_requests(user: User) -> List:
def get_pending_requests(user: User) -> list:
"""
Get all pending deletion requests for a user.
Args:
user: User to get requests for
Returns:
List of pending DeletionRequest instances
"""
from documents.models import DeletionRequest
return list(
DeletionRequest.objects.filter(
user=user,
status=DeletionRequest.STATUS_PENDING,
)
),
)
@staticmethod
def format_deletion_request_for_user(request) -> str:
"""
Format a deletion request into a human-readable message.
This provides comprehensive information to the user about what
will be deleted, as required by agents.md.
Args:
request: DeletionRequest to format
Returns:
Formatted message string
"""
impact = request.impact_summary
message = f"""
===========================================
AI DELETION REQUEST #{request.id}
@ -192,27 +194,27 @@ REASON:
{request.ai_reason}
IMPACT SUMMARY:
- Number of documents: {impact.get('document_count', 0)}
- Affected tags: {', '.join(impact.get('affected_tags', [])) or 'None'}
- Affected correspondents: {', '.join(impact.get('affected_correspondents', [])) or 'None'}
- Affected document types: {', '.join(impact.get('affected_types', [])) or 'None'}
- Number of documents: {impact.get("document_count", 0)}
- Affected tags: {", ".join(impact.get("affected_tags", [])) or "None"}
- Affected correspondents: {", ".join(impact.get("affected_correspondents", [])) or "None"}
- Affected document types: {", ".join(impact.get("affected_types", [])) or "None"}
DATE RANGE:
- Earliest: {impact.get('date_range', {}).get('earliest', 'Unknown')}
- Latest: {impact.get('date_range', {}).get('latest', 'Unknown')}
- Earliest: {impact.get("date_range", {}).get("earliest", "Unknown")}
- Latest: {impact.get("date_range", {}).get("latest", "Unknown")}
DOCUMENTS TO BE DELETED:
"""
for i, doc in enumerate(impact.get('documents', []), 1):
for i, doc in enumerate(impact.get("documents", []), 1):
message += f"""
{i}. ID: {doc['id']} - {doc['title']}
Created: {doc['created']}
Correspondent: {doc['correspondent'] or 'None'}
Type: {doc['document_type'] or 'None'}
Tags: {', '.join(doc['tags']) or 'None'}
{i}. ID: {doc["id"]} - {doc["title"]}
Created: {doc["created"]}
Correspondent: {doc["correspondent"] or "None"}
Type: {doc["document_type"] or "None"}
Tags: {", ".join(doc["tags"]) or "None"}
"""
message += """
===========================================
@ -223,21 +225,21 @@ No files will be deleted until you confirm this action.
Please review the above information carefully before
approving or rejecting this request.
"""
return message
@staticmethod
def can_ai_delete_automatically() -> bool:
"""
Check if AI is allowed to delete automatically.
According to agents.md, AI should NEVER delete without user approval.
This method always returns False as a safety measure.
Returns:
Always False - AI cannot auto-delete
"""
return False
__all__ = ['AIDeletionManager']
__all__ = ["AIDeletionManager"]

View file

@ -20,21 +20,16 @@ According to agents.md requirements:
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Any, Tuple
from typing import TYPE_CHECKING
from typing import Any
from django.conf import settings
from django.db import transaction
if TYPE_CHECKING:
from documents.models import (
Document,
Tag,
Correspondent,
DocumentType,
StoragePath,
CustomField,
Workflow,
)
from documents.models import CustomField
from documents.models import Document
from documents.models import Workflow
logger = logging.getLogger("paperless.ai_scanner")
@ -45,17 +40,25 @@ class AIScanResult:
"""
def __init__(self):
self.tags: List[Tuple[int, float]] = [] # [(tag_id, confidence), ...]
self.correspondent: Optional[Tuple[int, float]] = None # (correspondent_id, confidence)
self.document_type: Optional[Tuple[int, float]] = None # (document_type_id, confidence)
self.storage_path: Optional[Tuple[int, float]] = None # (storage_path_id, confidence)
self.custom_fields: Dict[int, Tuple[Any, float]] = {} # {field_id: (value, confidence), ...}
self.workflows: List[Tuple[int, float]] = [] # [(workflow_id, confidence), ...]
self.extracted_entities: Dict[str, Any] = {} # NER results
self.title_suggestion: Optional[str] = None
self.metadata: Dict[str, Any] = {} # Additional metadata
self.tags: list[tuple[int, float]] = [] # [(tag_id, confidence), ...]
self.correspondent: tuple[int, float] | None = (
None # (correspondent_id, confidence)
)
self.document_type: tuple[int, float] | None = (
None # (document_type_id, confidence)
)
self.storage_path: tuple[int, float] | None = (
None # (storage_path_id, confidence)
)
self.custom_fields: dict[
int, tuple[Any, float],
] = {} # {field_id: (value, confidence), ...}
self.workflows: list[tuple[int, float]] = [] # [(workflow_id, confidence), ...]
self.extracted_entities: dict[str, Any] = {} # NER results
self.title_suggestion: str | None = None
self.metadata: dict[str, Any] = {} # Additional metadata
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
"""Convert scan results to dictionary for logging/serialization."""
return {
"tags": self.tags,
@ -73,7 +76,7 @@ class AIScanResult:
class AIDocumentScanner:
"""
Comprehensive AI scanner for automatic document metadata management.
This scanner integrates all ML/AI capabilities to provide automatic:
- Tag assignment based on content analysis
- Correspondent detection from document text
@ -81,7 +84,7 @@ class AIDocumentScanner:
- Storage path suggestion based on content/type
- Custom field extraction using NER
- Workflow assignment based on document characteristics
Features:
- High confidence threshold (>80%) for automatic application
- Medium confidence (60-80%) for suggestions requiring user review
@ -99,7 +102,7 @@ class AIDocumentScanner:
):
"""
Initialize AI scanner.
Args:
auto_apply_threshold: Confidence threshold for automatic application (default: 0.80)
suggest_threshold: Confidence threshold for suggestions (default: 0.60)
@ -108,7 +111,7 @@ class AIDocumentScanner:
"""
self.auto_apply_threshold = auto_apply_threshold
self.suggest_threshold = suggest_threshold
# Check settings for ML/OCR enablement
self.ml_enabled = (
enable_ml_features
@ -120,16 +123,16 @@ class AIDocumentScanner:
if enable_advanced_ocr is not None
else getattr(settings, "PAPERLESS_ENABLE_ADVANCED_OCR", True)
)
# Lazy loading of ML components
self._classifier = None
self._ner_extractor = None
self._semantic_search = None
self._table_extractor = None
logger.info(
f"AIDocumentScanner initialized - ML: {self.ml_enabled}, "
f"Advanced OCR: {self.advanced_ocr_enabled}"
f"Advanced OCR: {self.advanced_ocr_enabled}",
)
def _get_classifier(self):
@ -137,6 +140,7 @@ class AIDocumentScanner:
if self._classifier is None and self.ml_enabled:
try:
from documents.ml.classifier import TransformerDocumentClassifier
self._classifier = TransformerDocumentClassifier()
logger.info("ML classifier loaded successfully")
except Exception as e:
@ -149,6 +153,7 @@ class AIDocumentScanner:
if self._ner_extractor is None and self.ml_enabled:
try:
from documents.ml.ner import DocumentNER
self._ner_extractor = DocumentNER()
logger.info("NER extractor loaded successfully")
except Exception as e:
@ -160,6 +165,7 @@ class AIDocumentScanner:
if self._semantic_search is None and self.ml_enabled:
try:
from documents.ml.semantic_search import SemanticSearch
self._semantic_search = SemanticSearch()
logger.info("Semantic search loaded successfully")
except Exception as e:
@ -171,6 +177,7 @@ class AIDocumentScanner:
if self._table_extractor is None and self.advanced_ocr_enabled:
try:
from documents.ocr.table_extractor import TableExtractor
self._table_extractor = TableExtractor()
logger.info("Table extractor loaded successfully")
except Exception as e:
@ -185,90 +192,108 @@ class AIDocumentScanner:
) -> AIScanResult:
"""
Perform comprehensive AI scan of a document.
This is the main entry point for document scanning. It orchestrates
all AI/ML components to analyze the document and generate suggestions.
Args:
document: The Document model instance
document_text: The extracted text content
original_file_path: Path to original file (for OCR/image analysis)
Returns:
AIScanResult containing all suggestions and extracted data
"""
logger.info(f"Starting AI scan for document: {document.title} (ID: {document.pk})")
logger.info(
f"Starting AI scan for document: {document.title} (ID: {document.pk})",
)
result = AIScanResult()
# Extract entities using NER
result.extracted_entities = self._extract_entities(document_text)
# Analyze and suggest tags
result.tags = self._suggest_tags(document, document_text, result.extracted_entities)
result.tags = self._suggest_tags(
document, document_text, result.extracted_entities,
)
# Detect correspondent
result.correspondent = self._detect_correspondent(
document, document_text, result.extracted_entities
document,
document_text,
result.extracted_entities,
)
# Classify document type
result.document_type = self._classify_document_type(
document, document_text, result.extracted_entities
document,
document_text,
result.extracted_entities,
)
# Suggest storage path
result.storage_path = self._suggest_storage_path(
document, document_text, result
document,
document_text,
result,
)
# Extract custom fields
result.custom_fields = self._extract_custom_fields(
document, document_text, result.extracted_entities
document,
document_text,
result.extracted_entities,
)
# Suggest workflows
result.workflows = self._suggest_workflows(document, document_text, result)
# Generate improved title suggestion
result.title_suggestion = self._suggest_title(
document, document_text, result.extracted_entities
document,
document_text,
result.extracted_entities,
)
# Extract tables if advanced OCR enabled
if self.advanced_ocr_enabled and original_file_path:
result.metadata["tables"] = self._extract_tables(original_file_path)
logger.info(f"AI scan completed for document {document.pk}")
logger.debug(f"Scan results: {result.to_dict()}")
return result
def _extract_entities(self, text: str) -> Dict[str, Any]:
def _extract_entities(self, text: str) -> dict[str, Any]:
"""
Extract named entities from document text using NER.
Returns:
Dictionary with extracted entities (persons, orgs, dates, amounts, etc.)
"""
ner = self._get_ner_extractor()
if not ner:
return {}
try:
# Use extract_all to get comprehensive entity extraction
entities = ner.extract_all(text)
# Convert string lists to dict format for consistency
for key in ["persons", "organizations", "locations", "misc"]:
if key in entities and isinstance(entities[key], list):
entities[key] = [{"text": e} if isinstance(e, str) else e for e in entities[key]]
entities[key] = [
{"text": e} if isinstance(e, str) else e for e in entities[key]
]
for key in ["dates", "amounts"]:
if key in entities and isinstance(entities[key], list):
entities[key] = [{"text": e} if isinstance(e, str) else e for e in entities[key]]
logger.debug(f"Extracted entities from NER")
entities[key] = [
{"text": e} if isinstance(e, str) else e for e in entities[key]
]
logger.debug("Extracted entities from NER")
return entities
except Exception as e:
logger.error(f"Entity extraction failed: {e}", exc_info=True)
@ -278,156 +303,157 @@ class AIDocumentScanner:
self,
document: Document,
text: str,
entities: Dict[str, Any],
) -> List[Tuple[int, float]]:
entities: dict[str, Any],
) -> list[tuple[int, float]]:
"""
Suggest relevant tags based on document content and entities.
Uses a combination of:
- Keyword matching with existing tag patterns
- ML classification if available
- Entity-based suggestions (e.g., organization -> company tag)
Returns:
List of (tag_id, confidence) tuples
"""
from documents.models import Tag
from documents.matching import match_tags
from documents.models import Tag
suggestions = []
try:
# Use existing matching logic
matched_tags = match_tags(document, self._get_classifier())
# Add confidence scores based on matching strength
for tag in matched_tags:
confidence = 0.85 # High confidence for matched tags
suggestions.append((tag.id, confidence))
# Additional entity-based suggestions
if entities:
# Suggest tags based on detected entities
all_tags = Tag.objects.all()
# Check for organization entities -> company/business tags
if entities.get("organizations"):
for tag in all_tags.filter(name__icontains="company"):
suggestions.append((tag.id, 0.70))
# Check for date entities -> tax/financial tags if year-end
if entities.get("dates"):
for tag in all_tags.filter(name__icontains="tax"):
suggestions.append((tag.id, 0.65))
# Remove duplicates, keep highest confidence
seen = {}
for tag_id, conf in suggestions:
if tag_id not in seen or conf > seen[tag_id]:
seen[tag_id] = conf
suggestions = [(tid, conf) for tid, conf in seen.items()]
suggestions.sort(key=lambda x: x[1], reverse=True)
logger.debug(f"Suggested {len(suggestions)} tags")
except Exception as e:
logger.error(f"Tag suggestion failed: {e}", exc_info=True)
return suggestions
def _detect_correspondent(
self,
document: Document,
text: str,
entities: Dict[str, Any],
) -> Optional[Tuple[int, float]]:
entities: dict[str, Any],
) -> tuple[int, float] | None:
"""
Detect correspondent based on document content and entities.
Uses:
- Organization entities from NER
- Email domains
- Existing correspondent matching patterns
Returns:
(correspondent_id, confidence) or None
"""
from documents.models import Correspondent
from documents.matching import match_correspondents
from documents.models import Correspondent
try:
# Use existing matching logic
matched_correspondents = match_correspondents(document, self._get_classifier())
matched_correspondents = match_correspondents(
document, self._get_classifier(),
)
if matched_correspondents:
correspondent = matched_correspondents[0]
confidence = 0.85
logger.debug(
f"Detected correspondent: {correspondent.name} "
f"(confidence: {confidence})"
f"(confidence: {confidence})",
)
return (correspondent.id, confidence)
# Try to match based on NER organizations
if entities.get("organizations"):
org_name = entities["organizations"][0]["text"]
# Try to find existing correspondent with similar name
correspondents = Correspondent.objects.filter(
name__icontains=org_name[:20] # First 20 chars
name__icontains=org_name[:20], # First 20 chars
)
if correspondents.exists():
correspondent = correspondents.first()
confidence = 0.70
logger.debug(
f"Detected correspondent from NER: {correspondent.name} "
f"(confidence: {confidence})"
f"(confidence: {confidence})",
)
return (correspondent.id, confidence)
except Exception as e:
logger.error(f"Correspondent detection failed: {e}", exc_info=True)
return None
def _classify_document_type(
self,
document: Document,
text: str,
entities: Dict[str, Any],
) -> Optional[Tuple[int, float]]:
entities: dict[str, Any],
) -> tuple[int, float] | None:
"""
Classify document type using ML and content analysis.
Returns:
(document_type_id, confidence) or None
"""
from documents.models import DocumentType
from documents.matching import match_document_types
try:
# Use existing matching logic
matched_types = match_document_types(document, self._get_classifier())
if matched_types:
doc_type = matched_types[0]
confidence = 0.85
logger.debug(
f"Classified document type: {doc_type.name} "
f"(confidence: {confidence})"
f"(confidence: {confidence})",
)
return (doc_type.id, confidence)
# ML-based classification if available
classifier = self._get_classifier()
if classifier and hasattr(classifier, "predict"):
# This would need a trained model with document type labels
# For now, fall back to pattern matching
pass
except Exception as e:
logger.error(f"Document type classification failed: {e}", exc_info=True)
return None
def _suggest_storage_path(
@ -435,127 +461,131 @@ class AIDocumentScanner:
document: Document,
text: str,
scan_result: AIScanResult,
) -> Optional[Tuple[int, float]]:
) -> tuple[int, float] | None:
"""
Suggest appropriate storage path based on document characteristics.
Returns:
(storage_path_id, confidence) or None
"""
from documents.models import StoragePath
from documents.matching import match_storage_paths
try:
# Use existing matching logic
matched_paths = match_storage_paths(document, self._get_classifier())
if matched_paths:
storage_path = matched_paths[0]
confidence = 0.80
logger.debug(
f"Suggested storage path: {storage_path.name} "
f"(confidence: {confidence})"
f"(confidence: {confidence})",
)
return (storage_path.id, confidence)
except Exception as e:
logger.error(f"Storage path suggestion failed: {e}", exc_info=True)
return None
def _extract_custom_fields(
self,
document: Document,
text: str,
entities: Dict[str, Any],
) -> Dict[int, Tuple[Any, float]]:
entities: dict[str, Any],
) -> dict[int, tuple[Any, float]]:
"""
Extract values for custom fields using NER and pattern matching.
Returns:
Dictionary mapping field_id to (value, confidence)
"""
from documents.models import CustomField
extracted_fields = {}
try:
custom_fields = CustomField.objects.all()
for field in custom_fields:
# Try to extract field value based on field name and type
value, confidence = self._extract_field_value(
field, text, entities
field,
text,
entities,
)
if value is not None and confidence >= self.suggest_threshold:
extracted_fields[field.id] = (value, confidence)
logger.debug(
f"Extracted custom field '{field.name}': {value} "
f"(confidence: {confidence})"
f"(confidence: {confidence})",
)
except Exception as e:
logger.error(f"Custom field extraction failed: {e}", exc_info=True)
return extracted_fields
def _extract_field_value(
self,
field: CustomField,
text: str,
entities: Dict[str, Any],
) -> Tuple[Any, float]:
entities: dict[str, Any],
) -> tuple[Any, float]:
"""
Extract a single custom field value.
Returns:
(value, confidence) tuple
"""
field_name_lower = field.name.lower()
# Date fields
if "date" in field_name_lower:
dates = entities.get("dates", [])
if dates:
return (dates[0]["text"], 0.75)
# Amount/price fields
if any(keyword in field_name_lower for keyword in ["amount", "price", "cost", "total"]):
if any(
keyword in field_name_lower
for keyword in ["amount", "price", "cost", "total"]
):
amounts = entities.get("amounts", [])
if amounts:
return (amounts[0]["text"], 0.75)
# Invoice number fields
if "invoice" in field_name_lower:
invoice_numbers = entities.get("invoice_numbers", [])
if invoice_numbers:
return (invoice_numbers[0], 0.80)
# Email fields
if "email" in field_name_lower:
emails = entities.get("emails", [])
if emails:
return (emails[0], 0.85)
# Phone fields
if "phone" in field_name_lower:
phones = entities.get("phones", [])
if phones:
return (phones[0], 0.85)
# Person name fields
if "name" in field_name_lower or "person" in field_name_lower:
persons = entities.get("persons", [])
if persons:
return (persons[0]["text"], 0.70)
# Organization fields
if "company" in field_name_lower or "organization" in field_name_lower:
orgs = entities.get("organizations", [])
if orgs:
return (orgs[0]["text"], 0.70)
return (None, 0.0)
def _suggest_workflows(
@ -563,40 +593,43 @@ class AIDocumentScanner:
document: Document,
text: str,
scan_result: AIScanResult,
) -> List[Tuple[int, float]]:
) -> list[tuple[int, float]]:
"""
Suggest relevant workflows based on document characteristics.
Returns:
List of (workflow_id, confidence) tuples
"""
from documents.models import Workflow, WorkflowTrigger
from documents.models import Workflow
from documents.models import WorkflowTrigger
suggestions = []
try:
# Get all workflows with consumption triggers
workflows = Workflow.objects.filter(
enabled=True,
triggers__type=WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
).distinct()
for workflow in workflows:
# Evaluate workflow conditions against scan results
confidence = self._evaluate_workflow_match(
workflow, document, scan_result
workflow,
document,
scan_result,
)
if confidence >= self.suggest_threshold:
suggestions.append((workflow.id, confidence))
logger.debug(
f"Suggested workflow: {workflow.name} "
f"(confidence: {confidence})"
f"(confidence: {confidence})",
)
except Exception as e:
logger.error(f"Workflow suggestion failed: {e}", exc_info=True)
return suggestions
def _evaluate_workflow_match(
@ -607,80 +640,80 @@ class AIDocumentScanner:
) -> float:
"""
Evaluate how well a workflow matches the document.
Returns:
Confidence score (0.0 to 1.0)
"""
# This is a simplified evaluation
# In practice, you'd check workflow triggers and conditions
confidence = 0.5 # Base confidence
# Increase confidence if document type matches workflow expectations
if scan_result.document_type and workflow.actions.exists():
confidence += 0.2
# Increase confidence if correspondent matches
if scan_result.correspondent:
confidence += 0.15
# Increase confidence if tags match
if scan_result.tags:
confidence += 0.15
return min(confidence, 1.0)
def _suggest_title(
self,
document: Document,
text: str,
entities: Dict[str, Any],
) -> Optional[str]:
entities: dict[str, Any],
) -> str | None:
"""
Generate an improved title suggestion based on document content.
Returns:
Suggested title or None
"""
try:
# Extract key information for title
title_parts = []
# Add document type if detected
if entities.get("document_type"):
title_parts.append(entities["document_type"])
# Add primary organization
orgs = entities.get("organizations", [])
if orgs:
title_parts.append(orgs[0]["text"][:30]) # Limit length
# Add date if available
dates = entities.get("dates", [])
if dates:
title_parts.append(dates[0]["text"])
if title_parts:
suggested_title = " - ".join(title_parts)
logger.debug(f"Generated title suggestion: {suggested_title}")
return suggested_title[:127] # Respect title length limit
except Exception as e:
logger.error(f"Title suggestion failed: {e}", exc_info=True)
return None
def _extract_tables(self, file_path: str) -> List[Dict[str, Any]]:
def _extract_tables(self, file_path: str) -> list[dict[str, Any]]:
"""
Extract tables from document using advanced OCR.
Returns:
List of extracted tables with data and metadata
"""
extractor = self._get_table_extractor()
if not extractor:
return []
try:
tables = extractor.extract_tables_from_image(file_path)
logger.debug(f"Extracted {len(tables)} tables from document")
@ -695,21 +728,24 @@ class AIDocumentScanner:
scan_result: AIScanResult,
auto_apply: bool = True,
user_confirmed: bool = False,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Apply AI scan results to document.
Args:
document: Document to update
scan_result: AI scan results
auto_apply: Whether to auto-apply high confidence suggestions
user_confirmed: Whether user has confirmed low-confidence changes
Returns:
Dictionary with applied changes and pending suggestions
"""
from documents.models import Tag, Correspondent, DocumentType, StoragePath
from documents.models import Correspondent
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
applied = {
"tags": [],
"correspondent": None,
@ -717,7 +753,7 @@ class AIDocumentScanner:
"storage_path": None,
"custom_fields": {},
}
suggestions = {
"tags": [],
"correspondent": None,
@ -725,7 +761,7 @@ class AIDocumentScanner:
"storage_path": None,
"custom_fields": {},
}
try:
with transaction.atomic():
# Apply tags
@ -737,12 +773,14 @@ class AIDocumentScanner:
logger.info(f"Auto-applied tag: {tag.name}")
elif confidence >= self.suggest_threshold:
tag = Tag.objects.get(pk=tag_id)
suggestions["tags"].append({
"id": tag_id,
"name": tag.name,
"confidence": confidence,
})
suggestions["tags"].append(
{
"id": tag_id,
"name": tag.name,
"confidence": confidence,
},
)
# Apply correspondent
if scan_result.correspondent:
corr_id, confidence = scan_result.correspondent
@ -761,7 +799,7 @@ class AIDocumentScanner:
"name": correspondent.name,
"confidence": confidence,
}
# Apply document type
if scan_result.document_type:
type_id, confidence = scan_result.document_type
@ -780,7 +818,7 @@ class AIDocumentScanner:
"name": doc_type.name,
"confidence": confidence,
}
# Apply storage path
if scan_result.storage_path:
path_id, confidence = scan_result.storage_path
@ -799,13 +837,13 @@ class AIDocumentScanner:
"name": storage_path.name,
"confidence": confidence,
}
# Save document with changes
document.save()
except Exception as e:
logger.error(f"Failed to apply scan results: {e}", exc_info=True)
return {
"applied": applied,
"suggestions": suggestions,
@ -819,7 +857,7 @@ _scanner_instance = None
def get_ai_scanner() -> AIDocumentScanner:
"""
Get or create the global AI scanner instance.
Returns:
AIDocumentScanner instance
"""

View file

@ -756,22 +756,22 @@ class ConsumerPlugin(
def _run_ai_scanner(self, document, text):
"""
Run AI scanner on the document to automatically detect and apply metadata.
This is called during document consumption to leverage AI/ML capabilities
for automatic metadata management as specified in agents.md.
Args:
document: The Document model instance
text: The extracted document text
"""
try:
from documents.ai_scanner import get_ai_scanner
scanner = get_ai_scanner()
# Get the original file path if available
original_file_path = str(self.working_copy) if self.working_copy else None
# Perform comprehensive AI scan
self.log.info(f"Running AI scanner on document: {document.title}")
scan_result = scanner.scan_document(
@ -779,65 +779,65 @@ class ConsumerPlugin(
document_text=text,
original_file_path=original_file_path,
)
# Apply scan results (auto-apply high confidence, suggest medium confidence)
results = scanner.apply_scan_results(
document=document,
scan_result=scan_result,
auto_apply=True, # Auto-apply high confidence suggestions
)
# Log what was applied and suggested
if results["applied"]["tags"]:
self.log.info(
f"AI auto-applied tags: {[t['name'] for t in results['applied']['tags']]}"
f"AI auto-applied tags: {[t['name'] for t in results['applied']['tags']]}",
)
if results["applied"]["correspondent"]:
self.log.info(
f"AI auto-applied correspondent: {results['applied']['correspondent']['name']}"
f"AI auto-applied correspondent: {results['applied']['correspondent']['name']}",
)
if results["applied"]["document_type"]:
self.log.info(
f"AI auto-applied document type: {results['applied']['document_type']['name']}"
f"AI auto-applied document type: {results['applied']['document_type']['name']}",
)
if results["applied"]["storage_path"]:
self.log.info(
f"AI auto-applied storage path: {results['applied']['storage_path']['name']}"
f"AI auto-applied storage path: {results['applied']['storage_path']['name']}",
)
# Log suggestions for user review
if results["suggestions"]["tags"]:
self.log.info(
f"AI suggested tags (require review): "
f"{[t['name'] for t in results['suggestions']['tags']]}"
f"{[t['name'] for t in results['suggestions']['tags']]}",
)
if results["suggestions"]["correspondent"]:
self.log.info(
f"AI suggested correspondent (requires review): "
f"{results['suggestions']['correspondent']['name']}"
f"{results['suggestions']['correspondent']['name']}",
)
if results["suggestions"]["document_type"]:
self.log.info(
f"AI suggested document type (requires review): "
f"{results['suggestions']['document_type']['name']}"
f"{results['suggestions']['document_type']['name']}",
)
if results["suggestions"]["storage_path"]:
self.log.info(
f"AI suggested storage path (requires review): "
f"{results['suggestions']['storage_path']['name']}"
f"{results['suggestions']['storage_path']['name']}",
)
# Store suggestions in document metadata for UI to display
# This allows the frontend to show AI suggestions to users
if not hasattr(document, '_ai_suggestions'):
if not hasattr(document, "_ai_suggestions"):
document._ai_suggestions = results["suggestions"]
except ImportError:
# AI scanner not available, skip
self.log.debug("AI scanner not available, skipping AI analysis")