From cc9e66c11c05e6fad07c754f403d82e023d34e43 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 15:39:22 +0000 Subject: [PATCH] Changes before error encountered Co-authored-by: dawnsystem <42047891+dawnsystem@users.noreply.github.com> --- .../management/commands/scan_documents_ai.py | 573 ++++++++++++++++++ .../tests/test_management_scan_ai.py | 442 ++++++++++++++ 2 files changed, 1015 insertions(+) create mode 100644 src/documents/management/commands/scan_documents_ai.py create mode 100644 src/documents/tests/test_management_scan_ai.py diff --git a/src/documents/management/commands/scan_documents_ai.py b/src/documents/management/commands/scan_documents_ai.py new file mode 100644 index 000000000..62abb4a2b --- /dev/null +++ b/src/documents/management/commands/scan_documents_ai.py @@ -0,0 +1,573 @@ +""" +Management command to apply AI scanner to existing documents. + +This command allows batch processing of documents through the AI scanner, +enabling metadata suggestions for documents that were added before the +AI scanner was implemented or to re-scan documents with updated AI models. +""" + +import logging +from datetime import datetime +from typing import Any + +import tqdm +from django.core.management.base import BaseCommand +from django.core.management.base import CommandError +from django.utils import timezone + +from documents.ai_scanner import AIScanResult +from documents.ai_scanner import get_ai_scanner +from documents.management.commands.mixins import ProgressBarMixin +from documents.models import Document +from documents.models import DocumentType +from documents.models import Tag + +logger = logging.getLogger("paperless.management.scan_documents_ai") + + +class Command(ProgressBarMixin, BaseCommand): + """ + Management command to apply AI scanner to existing documents. + + This command processes existing documents through the comprehensive AI scanner + to generate metadata suggestions (tags, correspondents, document types, etc.). + """ + + help = ( + "Apply AI scanner to existing documents to generate metadata suggestions. " + "Supports filtering by document type, date range, and auto-apply for high " + "confidence suggestions. Use --dry-run to preview suggestions without applying." + ) + + def add_arguments(self, parser): + """Add command line arguments.""" + # Filtering options + parser.add_argument( + "--all", + action="store_true", + default=False, + help="Scan all documents in the system", + ) + + parser.add_argument( + "--filter-by-type", + type=int, + nargs="+", + metavar="TYPE_ID", + help="Filter documents by document type ID(s). Can specify multiple IDs.", + ) + + parser.add_argument( + "--date-range", + nargs=2, + metavar=("START_DATE", "END_DATE"), + help=( + "Filter documents by creation date range. " + "Format: YYYY-MM-DD YYYY-MM-DD. Example: 2024-01-01 2024-12-31" + ), + ) + + parser.add_argument( + "--id-range", + nargs=2, + type=int, + metavar=("START_ID", "END_ID"), + help="Filter documents by ID range. Example: 1 100", + ) + + # Processing options + parser.add_argument( + "--dry-run", + action="store_true", + default=False, + help="Preview suggestions without applying any changes", + ) + + parser.add_argument( + "--auto-apply-high-confidence", + action="store_true", + default=False, + help=( + "Automatically apply suggestions with high confidence (>=80%%). " + "Lower confidence suggestions will still be shown for review." + ), + ) + + parser.add_argument( + "--confidence-threshold", + type=float, + default=0.60, + help=( + "Minimum confidence threshold for showing suggestions (0.0-1.0). " + "Default: 0.60 (60%%)" + ), + ) + + # Progress bar + self.add_argument_progress_bar_mixin(parser) + + # Batch size for processing + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Number of documents to process in memory at once. Default: 100", + ) + + def handle(self, *args, **options): + """Execute the command.""" + self.handle_progress_bar_mixin(**options) + + # Validate arguments + self._validate_arguments(options) + + # Get queryset based on filters + queryset = self._build_queryset(options) + document_count = queryset.count() + + if document_count == 0: + self.stdout.write( + self.style.WARNING("No documents found matching the specified filters."), + ) + return + + # Initialize AI scanner + try: + scanner = get_ai_scanner() + except Exception as e: + raise CommandError(f"Failed to initialize AI scanner: {e}") + + # Display operation summary + self._display_operation_summary(options, document_count) + + # Process documents + results = self._process_documents( + queryset=queryset, + scanner=scanner, + options=options, + ) + + # Display final summary + self._display_final_summary(results, options) + + def _validate_arguments(self, options): + """Validate command line arguments.""" + # At least one filter must be specified + if not any([ + options["all"], + options["filter_by_type"], + options["date_range"], + options["id_range"], + ]): + raise CommandError( + "You must specify at least one filter: " + "--all, --filter-by-type, --date-range, or --id-range", + ) + + # Validate confidence threshold + if not 0.0 <= options["confidence_threshold"] <= 1.0: + raise CommandError("Confidence threshold must be between 0.0 and 1.0") + + # Validate date range format + if options["date_range"]: + try: + start_str, end_str = options["date_range"] + start_date = datetime.strptime(start_str, "%Y-%m-%d") + end_date = datetime.strptime(end_str, "%Y-%m-%d") + + if start_date > end_date: + raise CommandError("Start date must be before end date") + + # Store parsed dates for later use + options["_parsed_start_date"] = timezone.make_aware(start_date) + options["_parsed_end_date"] = timezone.make_aware( + end_date.replace(hour=23, minute=59, second=59), + ) + except ValueError as e: + raise CommandError( + f"Invalid date format. Use YYYY-MM-DD. Error: {e}", + ) + + # Validate document types exist + if options["filter_by_type"]: + for type_id in options["filter_by_type"]: + if not DocumentType.objects.filter(pk=type_id).exists(): + raise CommandError( + f"Document type with ID {type_id} does not exist", + ) + + def _build_queryset(self, options): + """Build document queryset based on filters.""" + queryset = Document.objects.all() + + # Filter by document type + if options["filter_by_type"]: + queryset = queryset.filter(document_type__id__in=options["filter_by_type"]) + + # Filter by date range + if options["date_range"]: + queryset = queryset.filter( + created__gte=options["_parsed_start_date"], + created__lte=options["_parsed_end_date"], + ) + + # Filter by ID range + if options["id_range"]: + start_id, end_id = options["id_range"] + queryset = queryset.filter(id__gte=start_id, id__lte=end_id) + + # Order by ID for consistent processing + return queryset.order_by("id") + + def _display_operation_summary(self, options, document_count): + """Display summary of the operation before starting.""" + self.stdout.write(self.style.SUCCESS("\n" + "=" * 70)) + self.stdout.write(self.style.SUCCESS("AI Document Scanner - Batch Processing")) + self.stdout.write(self.style.SUCCESS("=" * 70 + "\n")) + + # Display filters + self.stdout.write("Filters applied:") + if options["all"]: + self.stdout.write(" • Processing ALL documents") + if options["filter_by_type"]: + type_ids = ", ".join(str(tid) for tid in options["filter_by_type"]) + self.stdout.write(f" • Document types: {type_ids}") + if options["date_range"]: + start, end = options["date_range"] + self.stdout.write(f" • Date range: {start} to {end}") + if options["id_range"]: + start, end = options["id_range"] + self.stdout.write(f" • ID range: {start} to {end}") + + # Display processing mode + self.stdout.write("\nProcessing mode:") + if options["dry_run"]: + self.stdout.write(self.style.WARNING(" • DRY RUN - No changes will be applied")) + elif options["auto_apply_high_confidence"]: + self.stdout.write(" • Auto-apply high confidence suggestions (≥80%)") + else: + self.stdout.write(" • Preview mode - No changes will be applied") + + self.stdout.write( + f" • Confidence threshold: {options['confidence_threshold']:.0%}", + ) + + # Display document count + self.stdout.write( + f"\n{self.style.SUCCESS('Documents to process:')} {document_count}", + ) + self.stdout.write("\n" + "=" * 70 + "\n") + + def _process_documents( + self, + queryset, + scanner, + options, + ) -> dict[str, Any]: + """ + Process documents through the AI scanner. + + Returns: + Dictionary with processing results and statistics + """ + results = { + "processed": 0, + "errors": 0, + "suggestions_generated": 0, + "auto_applied": 0, + "documents_with_suggestions": [], + "error_documents": [], + } + + batch_size = options["batch_size"] + confidence_threshold = options["confidence_threshold"] + auto_apply = options["auto_apply_high_confidence"] and not options["dry_run"] + + # Process in batches + total_docs = queryset.count() + + for i in tqdm.tqdm( + range(0, total_docs, batch_size), + disable=self.no_progress_bar, + desc="Processing batches", + ): + batch = queryset[i:i + batch_size] + + for document in batch: + try: + # Get document text + document_text = document.content or "" + + if not document_text: + logger.warning( + f"Document {document.id} has no text content, skipping", + ) + continue + + # Scan document + scan_result = scanner.scan_document( + document=document, + document_text=document_text, + ) + + # Filter results by confidence threshold + filtered_result = self._filter_by_confidence( + scan_result, + confidence_threshold, + ) + + # Count suggestions + suggestion_count = self._count_suggestions(filtered_result) + + if suggestion_count > 0: + results["suggestions_generated"] += suggestion_count + + # Apply or store suggestions + if auto_apply: + applied = scanner.apply_scan_results( + document=document, + scan_result=filtered_result, + auto_apply=True, + ) + results["auto_applied"] += len( + applied.get("applied", {}).get("tags", []), + ) + + # Store for summary + results["documents_with_suggestions"].append({ + "id": document.id, + "title": document.title, + "suggestions": filtered_result.to_dict(), + "applied": applied if auto_apply else None, + }) + + results["processed"] += 1 + + except Exception as e: + logger.error( + f"Error processing document {document.id}: {e}", + exc_info=True, + ) + results["errors"] += 1 + results["error_documents"].append({ + "id": document.id, + "title": document.title, + "error": str(e), + }) + + return results + + def _filter_by_confidence( + self, + scan_result: AIScanResult, + threshold: float, + ) -> AIScanResult: + """Filter scan results by confidence threshold.""" + filtered = AIScanResult() + + # Filter tags + filtered.tags = [ + (tag_id, conf) for tag_id, conf in scan_result.tags + if conf >= threshold + ] + + # Filter correspondent + if scan_result.correspondent: + corr_id, conf = scan_result.correspondent + if conf >= threshold: + filtered.correspondent = scan_result.correspondent + + # Filter document type + if scan_result.document_type: + type_id, conf = scan_result.document_type + if conf >= threshold: + filtered.document_type = scan_result.document_type + + # Filter storage path + if scan_result.storage_path: + path_id, conf = scan_result.storage_path + if conf >= threshold: + filtered.storage_path = scan_result.storage_path + + # Filter custom fields + for field_id, (value, conf) in scan_result.custom_fields.items(): + if conf >= threshold: + filtered.custom_fields[field_id] = (value, conf) + + # Filter workflows + filtered.workflows = [ + (wf_id, conf) for wf_id, conf in scan_result.workflows + if conf >= threshold + ] + + # Copy other fields as-is + filtered.extracted_entities = scan_result.extracted_entities + filtered.title_suggestion = scan_result.title_suggestion + filtered.metadata = scan_result.metadata + + return filtered + + def _count_suggestions(self, scan_result: AIScanResult) -> int: + """Count total number of suggestions in scan result.""" + count = 0 + count += len(scan_result.tags) + count += 1 if scan_result.correspondent else 0 + count += 1 if scan_result.document_type else 0 + count += 1 if scan_result.storage_path else 0 + count += len(scan_result.custom_fields) + count += len(scan_result.workflows) + count += 1 if scan_result.title_suggestion else 0 + return count + + def _display_final_summary(self, results: dict[str, Any], options): + """Display final summary of processing results.""" + self.stdout.write("\n" + "=" * 70) + self.stdout.write(self.style.SUCCESS("Processing Complete - Summary")) + self.stdout.write("=" * 70 + "\n") + + # Display statistics + self.stdout.write("Statistics:") + self.stdout.write(f" • Documents processed: {results['processed']}") + self.stdout.write(f" • Documents with suggestions: {len(results['documents_with_suggestions'])}") + self.stdout.write(f" • Total suggestions generated: {results['suggestions_generated']}") + + if options["auto_apply_high_confidence"] and not options["dry_run"]: + self.stdout.write( + self.style.SUCCESS(f" • Suggestions auto-applied: {results['auto_applied']}"), + ) + + if results["errors"] > 0: + self.stdout.write( + self.style.ERROR(f" • Errors encountered: {results['errors']}"), + ) + + # Display sample suggestions + if results["documents_with_suggestions"]: + self.stdout.write("\n" + "-" * 70) + self.stdout.write("Sample Suggestions (first 5 documents):\n") + + for doc_info in results["documents_with_suggestions"][:5]: + self._display_document_suggestions(doc_info, options) + + # Display errors + if results["error_documents"]: + self.stdout.write("\n" + "-" * 70) + self.stdout.write(self.style.ERROR("Errors:\n")) + + for error_info in results["error_documents"][:10]: + self.stdout.write( + f" • Document {error_info['id']}: {error_info['title']}", + ) + self.stdout.write(f" Error: {error_info['error']}") + + # Final message + self.stdout.write("\n" + "=" * 70) + if options["dry_run"]: + self.stdout.write( + self.style.WARNING( + "DRY RUN completed - No changes were applied to documents.", + ), + ) + elif options["auto_apply_high_confidence"]: + self.stdout.write( + self.style.SUCCESS( + f"Processing complete - {results['auto_applied']} high confidence " + "suggestions were automatically applied.", + ), + ) + else: + self.stdout.write( + self.style.SUCCESS( + "Processing complete - Suggestions generated. Use " + "--auto-apply-high-confidence to apply them automatically.", + ), + ) + self.stdout.write("=" * 70 + "\n") + + def _display_document_suggestions(self, doc_info: dict[str, Any], options): + """Display suggestions for a single document.""" + from documents.models import Correspondent + from documents.models import DocumentType + from documents.models import StoragePath + + self.stdout.write( + f"\n Document #{doc_info['id']}: {doc_info['title']}", + ) + + suggestions = doc_info["suggestions"] + + # Tags + if suggestions.get("tags"): + self.stdout.write(" Tags:") + for tag_id, conf in suggestions["tags"][:3]: # Show first 3 + try: + tag = Tag.objects.get(pk=tag_id) + self.stdout.write( + f" • {tag.name} (confidence: {conf:.0%})", + ) + except Tag.DoesNotExist: + pass + + # Correspondent + if suggestions.get("correspondent"): + corr_id, conf = suggestions["correspondent"] + try: + correspondent = Correspondent.objects.get(pk=corr_id) + self.stdout.write( + f" Correspondent: {correspondent.name} (confidence: {conf:.0%})", + ) + except Correspondent.DoesNotExist: + pass + + # Document Type + if suggestions.get("document_type"): + type_id, conf = suggestions["document_type"] + try: + doc_type = DocumentType.objects.get(pk=type_id) + self.stdout.write( + f" Document Type: {doc_type.name} (confidence: {conf:.0%})", + ) + except DocumentType.DoesNotExist: + pass + + # Storage Path + if suggestions.get("storage_path"): + path_id, conf = suggestions["storage_path"] + try: + storage_path = StoragePath.objects.get(pk=path_id) + self.stdout.write( + f" Storage Path: {storage_path.name} (confidence: {conf:.0%})", + ) + except StoragePath.DoesNotExist: + pass + + # Title suggestion + if suggestions.get("title_suggestion"): + self.stdout.write( + f" Title: {suggestions['title_suggestion']}", + ) + + # Applied changes (if auto-apply was enabled) + if doc_info.get("applied"): + applied = doc_info["applied"].get("applied", {}) + if any(applied.values()): + self.stdout.write( + self.style.SUCCESS(" ✓ Applied changes:"), + ) + if applied.get("tags"): + tag_names = [t["name"] for t in applied["tags"]] + self.stdout.write( + f" • Tags: {', '.join(tag_names)}", + ) + if applied.get("correspondent"): + self.stdout.write( + f" • Correspondent: {applied['correspondent']['name']}", + ) + if applied.get("document_type"): + self.stdout.write( + f" • Type: {applied['document_type']['name']}", + ) + if applied.get("storage_path"): + self.stdout.write( + f" • Path: {applied['storage_path']['name']}", + ) diff --git a/src/documents/tests/test_management_scan_ai.py b/src/documents/tests/test_management_scan_ai.py new file mode 100644 index 000000000..35e6da069 --- /dev/null +++ b/src/documents/tests/test_management_scan_ai.py @@ -0,0 +1,442 @@ +""" +Tests for the scan_documents_ai management command. +""" + +from io import StringIO +from unittest import mock + +from django.core.management import CommandError +from django.core.management import call_command +from django.test import TestCase +from django.test import override_settings +from django.utils import timezone + +from documents.ai_scanner import AIScanResult +from documents.models import Correspondent +from documents.models import Document +from documents.models import DocumentType +from documents.models import Tag +from documents.tests.utils import DirectoriesMixin + + +class TestScanDocumentsAICommand(DirectoriesMixin, TestCase): + """Test cases for the scan_documents_ai management command.""" + + def setUp(self): + """Set up test data.""" + super().setUp() + + # Create test document types + self.doc_type_invoice = DocumentType.objects.create(name="Invoice") + self.doc_type_receipt = DocumentType.objects.create(name="Receipt") + + # Create test tags + self.tag_important = Tag.objects.create(name="Important") + self.tag_tax = Tag.objects.create(name="Tax") + + # Create test correspondent + self.correspondent = Correspondent.objects.create(name="Test Company") + + # Create test documents + self.doc1 = Document.objects.create( + title="Test Document 1", + content="This is a test invoice document with important information.", + mime_type="application/pdf", + checksum="ABC123", + ) + + self.doc2 = Document.objects.create( + title="Test Document 2", + content="This is another test receipt document.", + mime_type="application/pdf", + checksum="DEF456", + document_type=self.doc_type_receipt, + ) + + self.doc3 = Document.objects.create( + title="Test Document 3", + content="A third document for testing date ranges.", + mime_type="application/pdf", + checksum="GHI789", + created=timezone.now() - timezone.timedelta(days=365), + ) + + def test_command_requires_filter(self): + """Test that command requires at least one filter option.""" + with self.assertRaises(CommandError) as cm: + call_command("scan_documents_ai") + + self.assertIn("at least one filter", str(cm.exception)) + + def test_command_all_flag(self): + """Test command with --all flag.""" + # Mock the AI scanner + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + # Create a mock scan result + mock_result = AIScanResult() + mock_result.tags = [(self.tag_important.id, 0.85)] + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + call_command( + "scan_documents_ai", + "--all", + "--dry-run", + "--no-progress-bar", + stdout=out, + ) + + output = out.getvalue() + self.assertIn("Processing Complete", output) + self.assertIn("Documents processed:", output) + + def test_command_filter_by_type(self): + """Test command with --filter-by-type option.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + mock_result = AIScanResult() + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + call_command( + "scan_documents_ai", + "--filter-by-type", + str(self.doc_type_receipt.id), + "--dry-run", + "--no-progress-bar", + stdout=out, + ) + + # Should only scan doc2 which has the receipt type + self.assertEqual(mock_instance.scan_document.call_count, 1) + + def test_command_invalid_document_type(self): + """Test command with invalid document type ID.""" + with self.assertRaises(CommandError) as cm: + call_command( + "scan_documents_ai", + "--filter-by-type", + "99999", + "--dry-run", + ) + + self.assertIn("does not exist", str(cm.exception)) + + def test_command_date_range(self): + """Test command with --date-range option.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + mock_result = AIScanResult() + mock_instance.scan_document.return_value = mock_result + + # Test with a date range that includes recent documents + today = timezone.now().date() + yesterday = (timezone.now() - timezone.timedelta(days=1)).date() + + out = StringIO() + call_command( + "scan_documents_ai", + "--date-range", + str(yesterday), + str(today), + "--dry-run", + "--no-progress-bar", + stdout=out, + ) + + # Should scan doc1 and doc2 (recent), not doc3 (old) + self.assertGreaterEqual(mock_instance.scan_document.call_count, 2) + + def test_command_invalid_date_range(self): + """Test command with invalid date range.""" + with self.assertRaises(CommandError) as cm: + call_command( + "scan_documents_ai", + "--date-range", + "2024-12-31", + "2024-01-01", # End before start + "--dry-run", + ) + + self.assertIn("Start date must be before end date", str(cm.exception)) + + def test_command_invalid_date_format(self): + """Test command with invalid date format.""" + with self.assertRaises(CommandError) as cm: + call_command( + "scan_documents_ai", + "--date-range", + "01/01/2024", # Wrong format + "12/31/2024", + "--dry-run", + ) + + self.assertIn("Invalid date format", str(cm.exception)) + + def test_command_id_range(self): + """Test command with --id-range option.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + mock_result = AIScanResult() + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + call_command( + "scan_documents_ai", + "--id-range", + str(self.doc1.id), + str(self.doc1.id), + "--dry-run", + "--no-progress-bar", + stdout=out, + ) + + # Should only scan doc1 + self.assertEqual(mock_instance.scan_document.call_count, 1) + + def test_command_confidence_threshold(self): + """Test command with custom confidence threshold.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + # Create mock result with low confidence + mock_result = AIScanResult() + mock_result.tags = [(self.tag_important.id, 0.50)] # Low confidence + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + call_command( + "scan_documents_ai", + "--all", + "--dry-run", + "--confidence-threshold", + "0.40", # Lower threshold + "--no-progress-bar", + stdout=out, + ) + + output = out.getvalue() + # Should show suggestions with low confidence + self.assertIn("suggestions generated", output.lower()) + + def test_command_invalid_confidence_threshold(self): + """Test command with invalid confidence threshold.""" + with self.assertRaises(CommandError) as cm: + call_command( + "scan_documents_ai", + "--all", + "--confidence-threshold", + "1.5", # Invalid (> 1.0) + "--dry-run", + ) + + self.assertIn("between 0.0 and 1.0", str(cm.exception)) + + def test_command_auto_apply(self): + """Test command with --auto-apply-high-confidence.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + # Create mock result with high confidence + mock_result = AIScanResult() + mock_result.tags = [(self.tag_important.id, 0.90)] + mock_instance.scan_document.return_value = mock_result + + # Mock apply_scan_results + mock_instance.apply_scan_results.return_value = { + "applied": { + "tags": [{"id": self.tag_important.id, "name": "Important"}], + }, + "suggestions": {}, + } + + out = StringIO() + call_command( + "scan_documents_ai", + "--all", + "--auto-apply-high-confidence", + "--no-progress-bar", + stdout=out, + ) + + # Should call apply_scan_results with auto_apply=True + self.assertTrue(mock_instance.apply_scan_results.called) + call_args = mock_instance.apply_scan_results.call_args + self.assertTrue(call_args[1]["auto_apply"]) + + def test_command_dry_run_does_not_apply(self): + """Test that dry run mode does not apply changes.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + mock_result = AIScanResult() + mock_result.tags = [(self.tag_important.id, 0.90)] + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + call_command( + "scan_documents_ai", + "--all", + "--dry-run", + "--auto-apply-high-confidence", # Should be ignored + "--no-progress-bar", + stdout=out, + ) + + # Should not call apply_scan_results in dry-run mode + self.assertFalse(mock_instance.apply_scan_results.called) + + output = out.getvalue() + self.assertIn("DRY RUN", output) + + def test_command_handles_document_without_content(self): + """Test that command handles documents without content gracefully.""" + # Create document without content + doc_no_content = Document.objects.create( + title="No Content Doc", + content="", # Empty content + mime_type="application/pdf", + checksum="EMPTY123", + ) + + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + mock_result = AIScanResult() + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + call_command( + "scan_documents_ai", + "--id-range", + str(doc_no_content.id), + str(doc_no_content.id), + "--dry-run", + "--no-progress-bar", + stdout=out, + ) + + # Should not call scan_document for empty content + self.assertEqual(mock_instance.scan_document.call_count, 0) + + def test_command_handles_scanner_error(self): + """Test that command handles scanner errors gracefully.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + # Make scan_document raise an exception + mock_instance.scan_document.side_effect = Exception("Scanner error") + + out = StringIO() + call_command( + "scan_documents_ai", + "--all", + "--dry-run", + "--no-progress-bar", + stdout=out, + ) + + output = out.getvalue() + # Should report errors + self.assertIn("Errors encountered:", output) + + def test_command_batch_processing(self): + """Test that command processes documents in batches.""" + # Create more documents + for i in range(10): + Document.objects.create( + title=f"Batch Doc {i}", + content=f"Content {i}", + mime_type="application/pdf", + checksum=f"BATCH{i}", + ) + + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + mock_result = AIScanResult() + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + call_command( + "scan_documents_ai", + "--all", + "--dry-run", + "--batch-size", + "5", + "--no-progress-bar", + stdout=out, + ) + + # Should process all documents + self.assertGreaterEqual(mock_instance.scan_document.call_count, 10) + + def test_command_displays_suggestions(self): + """Test that command displays suggestions in output.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + # Create comprehensive scan result + mock_result = AIScanResult() + mock_result.tags = [(self.tag_important.id, 0.85)] + mock_result.correspondent = (self.correspondent.id, 0.80) + mock_result.document_type = (self.doc_type_invoice.id, 0.90) + mock_result.title_suggestion = "Suggested Title" + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + call_command( + "scan_documents_ai", + "--id-range", + str(self.doc1.id), + str(self.doc1.id), + "--dry-run", + "--no-progress-bar", + stdout=out, + ) + + output = out.getvalue() + # Should display various suggestion types + self.assertIn("Sample Suggestions", output) + self.assertIn("Tags:", output) + self.assertIn("Correspondent:", output) + self.assertIn("Document Type:", output) + + @override_settings(PAPERLESS_ENABLE_AI_SCANNER=False) + def test_command_works_when_ai_disabled(self): + """Test that command can run even if AI scanner is disabled in settings.""" + with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner: + mock_instance = mock.Mock() + mock_scanner.return_value = mock_instance + + mock_result = AIScanResult() + mock_instance.scan_document.return_value = mock_result + + out = StringIO() + # Should not raise an error + call_command( + "scan_documents_ai", + "--all", + "--dry-run", + "--no-progress-bar", + stdout=out, + ) + + output = out.getvalue() + self.assertIn("Processing Complete", output)