Changes before error encountered

Co-authored-by: dawnsystem <42047891+dawnsystem@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot] 2025-11-12 15:39:22 +00:00
parent 275ff4d1d4
commit cc9e66c11c
2 changed files with 1015 additions and 0 deletions

View file

@ -0,0 +1,573 @@
"""
Management command to apply AI scanner to existing documents.
This command allows batch processing of documents through the AI scanner,
enabling metadata suggestions for documents that were added before the
AI scanner was implemented or to re-scan documents with updated AI models.
"""
import logging
from datetime import datetime
from typing import Any
import tqdm
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django.utils import timezone
from documents.ai_scanner import AIScanResult
from documents.ai_scanner import get_ai_scanner
from documents.management.commands.mixins import ProgressBarMixin
from documents.models import Document
from documents.models import DocumentType
from documents.models import Tag
logger = logging.getLogger("paperless.management.scan_documents_ai")
class Command(ProgressBarMixin, BaseCommand):
"""
Management command to apply AI scanner to existing documents.
This command processes existing documents through the comprehensive AI scanner
to generate metadata suggestions (tags, correspondents, document types, etc.).
"""
help = (
"Apply AI scanner to existing documents to generate metadata suggestions. "
"Supports filtering by document type, date range, and auto-apply for high "
"confidence suggestions. Use --dry-run to preview suggestions without applying."
)
def add_arguments(self, parser):
"""Add command line arguments."""
# Filtering options
parser.add_argument(
"--all",
action="store_true",
default=False,
help="Scan all documents in the system",
)
parser.add_argument(
"--filter-by-type",
type=int,
nargs="+",
metavar="TYPE_ID",
help="Filter documents by document type ID(s). Can specify multiple IDs.",
)
parser.add_argument(
"--date-range",
nargs=2,
metavar=("START_DATE", "END_DATE"),
help=(
"Filter documents by creation date range. "
"Format: YYYY-MM-DD YYYY-MM-DD. Example: 2024-01-01 2024-12-31"
),
)
parser.add_argument(
"--id-range",
nargs=2,
type=int,
metavar=("START_ID", "END_ID"),
help="Filter documents by ID range. Example: 1 100",
)
# Processing options
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="Preview suggestions without applying any changes",
)
parser.add_argument(
"--auto-apply-high-confidence",
action="store_true",
default=False,
help=(
"Automatically apply suggestions with high confidence (>=80%%). "
"Lower confidence suggestions will still be shown for review."
),
)
parser.add_argument(
"--confidence-threshold",
type=float,
default=0.60,
help=(
"Minimum confidence threshold for showing suggestions (0.0-1.0). "
"Default: 0.60 (60%%)"
),
)
# Progress bar
self.add_argument_progress_bar_mixin(parser)
# Batch size for processing
parser.add_argument(
"--batch-size",
type=int,
default=100,
help="Number of documents to process in memory at once. Default: 100",
)
def handle(self, *args, **options):
"""Execute the command."""
self.handle_progress_bar_mixin(**options)
# Validate arguments
self._validate_arguments(options)
# Get queryset based on filters
queryset = self._build_queryset(options)
document_count = queryset.count()
if document_count == 0:
self.stdout.write(
self.style.WARNING("No documents found matching the specified filters."),
)
return
# Initialize AI scanner
try:
scanner = get_ai_scanner()
except Exception as e:
raise CommandError(f"Failed to initialize AI scanner: {e}")
# Display operation summary
self._display_operation_summary(options, document_count)
# Process documents
results = self._process_documents(
queryset=queryset,
scanner=scanner,
options=options,
)
# Display final summary
self._display_final_summary(results, options)
def _validate_arguments(self, options):
"""Validate command line arguments."""
# At least one filter must be specified
if not any([
options["all"],
options["filter_by_type"],
options["date_range"],
options["id_range"],
]):
raise CommandError(
"You must specify at least one filter: "
"--all, --filter-by-type, --date-range, or --id-range",
)
# Validate confidence threshold
if not 0.0 <= options["confidence_threshold"] <= 1.0:
raise CommandError("Confidence threshold must be between 0.0 and 1.0")
# Validate date range format
if options["date_range"]:
try:
start_str, end_str = options["date_range"]
start_date = datetime.strptime(start_str, "%Y-%m-%d")
end_date = datetime.strptime(end_str, "%Y-%m-%d")
if start_date > end_date:
raise CommandError("Start date must be before end date")
# Store parsed dates for later use
options["_parsed_start_date"] = timezone.make_aware(start_date)
options["_parsed_end_date"] = timezone.make_aware(
end_date.replace(hour=23, minute=59, second=59),
)
except ValueError as e:
raise CommandError(
f"Invalid date format. Use YYYY-MM-DD. Error: {e}",
)
# Validate document types exist
if options["filter_by_type"]:
for type_id in options["filter_by_type"]:
if not DocumentType.objects.filter(pk=type_id).exists():
raise CommandError(
f"Document type with ID {type_id} does not exist",
)
def _build_queryset(self, options):
"""Build document queryset based on filters."""
queryset = Document.objects.all()
# Filter by document type
if options["filter_by_type"]:
queryset = queryset.filter(document_type__id__in=options["filter_by_type"])
# Filter by date range
if options["date_range"]:
queryset = queryset.filter(
created__gte=options["_parsed_start_date"],
created__lte=options["_parsed_end_date"],
)
# Filter by ID range
if options["id_range"]:
start_id, end_id = options["id_range"]
queryset = queryset.filter(id__gte=start_id, id__lte=end_id)
# Order by ID for consistent processing
return queryset.order_by("id")
def _display_operation_summary(self, options, document_count):
"""Display summary of the operation before starting."""
self.stdout.write(self.style.SUCCESS("\n" + "=" * 70))
self.stdout.write(self.style.SUCCESS("AI Document Scanner - Batch Processing"))
self.stdout.write(self.style.SUCCESS("=" * 70 + "\n"))
# Display filters
self.stdout.write("Filters applied:")
if options["all"]:
self.stdout.write(" • Processing ALL documents")
if options["filter_by_type"]:
type_ids = ", ".join(str(tid) for tid in options["filter_by_type"])
self.stdout.write(f" • Document types: {type_ids}")
if options["date_range"]:
start, end = options["date_range"]
self.stdout.write(f" • Date range: {start} to {end}")
if options["id_range"]:
start, end = options["id_range"]
self.stdout.write(f" • ID range: {start} to {end}")
# Display processing mode
self.stdout.write("\nProcessing mode:")
if options["dry_run"]:
self.stdout.write(self.style.WARNING(" • DRY RUN - No changes will be applied"))
elif options["auto_apply_high_confidence"]:
self.stdout.write(" • Auto-apply high confidence suggestions (≥80%)")
else:
self.stdout.write(" • Preview mode - No changes will be applied")
self.stdout.write(
f" • Confidence threshold: {options['confidence_threshold']:.0%}",
)
# Display document count
self.stdout.write(
f"\n{self.style.SUCCESS('Documents to process:')} {document_count}",
)
self.stdout.write("\n" + "=" * 70 + "\n")
def _process_documents(
self,
queryset,
scanner,
options,
) -> dict[str, Any]:
"""
Process documents through the AI scanner.
Returns:
Dictionary with processing results and statistics
"""
results = {
"processed": 0,
"errors": 0,
"suggestions_generated": 0,
"auto_applied": 0,
"documents_with_suggestions": [],
"error_documents": [],
}
batch_size = options["batch_size"]
confidence_threshold = options["confidence_threshold"]
auto_apply = options["auto_apply_high_confidence"] and not options["dry_run"]
# Process in batches
total_docs = queryset.count()
for i in tqdm.tqdm(
range(0, total_docs, batch_size),
disable=self.no_progress_bar,
desc="Processing batches",
):
batch = queryset[i:i + batch_size]
for document in batch:
try:
# Get document text
document_text = document.content or ""
if not document_text:
logger.warning(
f"Document {document.id} has no text content, skipping",
)
continue
# Scan document
scan_result = scanner.scan_document(
document=document,
document_text=document_text,
)
# Filter results by confidence threshold
filtered_result = self._filter_by_confidence(
scan_result,
confidence_threshold,
)
# Count suggestions
suggestion_count = self._count_suggestions(filtered_result)
if suggestion_count > 0:
results["suggestions_generated"] += suggestion_count
# Apply or store suggestions
if auto_apply:
applied = scanner.apply_scan_results(
document=document,
scan_result=filtered_result,
auto_apply=True,
)
results["auto_applied"] += len(
applied.get("applied", {}).get("tags", []),
)
# Store for summary
results["documents_with_suggestions"].append({
"id": document.id,
"title": document.title,
"suggestions": filtered_result.to_dict(),
"applied": applied if auto_apply else None,
})
results["processed"] += 1
except Exception as e:
logger.error(
f"Error processing document {document.id}: {e}",
exc_info=True,
)
results["errors"] += 1
results["error_documents"].append({
"id": document.id,
"title": document.title,
"error": str(e),
})
return results
def _filter_by_confidence(
self,
scan_result: AIScanResult,
threshold: float,
) -> AIScanResult:
"""Filter scan results by confidence threshold."""
filtered = AIScanResult()
# Filter tags
filtered.tags = [
(tag_id, conf) for tag_id, conf in scan_result.tags
if conf >= threshold
]
# Filter correspondent
if scan_result.correspondent:
corr_id, conf = scan_result.correspondent
if conf >= threshold:
filtered.correspondent = scan_result.correspondent
# Filter document type
if scan_result.document_type:
type_id, conf = scan_result.document_type
if conf >= threshold:
filtered.document_type = scan_result.document_type
# Filter storage path
if scan_result.storage_path:
path_id, conf = scan_result.storage_path
if conf >= threshold:
filtered.storage_path = scan_result.storage_path
# Filter custom fields
for field_id, (value, conf) in scan_result.custom_fields.items():
if conf >= threshold:
filtered.custom_fields[field_id] = (value, conf)
# Filter workflows
filtered.workflows = [
(wf_id, conf) for wf_id, conf in scan_result.workflows
if conf >= threshold
]
# Copy other fields as-is
filtered.extracted_entities = scan_result.extracted_entities
filtered.title_suggestion = scan_result.title_suggestion
filtered.metadata = scan_result.metadata
return filtered
def _count_suggestions(self, scan_result: AIScanResult) -> int:
"""Count total number of suggestions in scan result."""
count = 0
count += len(scan_result.tags)
count += 1 if scan_result.correspondent else 0
count += 1 if scan_result.document_type else 0
count += 1 if scan_result.storage_path else 0
count += len(scan_result.custom_fields)
count += len(scan_result.workflows)
count += 1 if scan_result.title_suggestion else 0
return count
def _display_final_summary(self, results: dict[str, Any], options):
"""Display final summary of processing results."""
self.stdout.write("\n" + "=" * 70)
self.stdout.write(self.style.SUCCESS("Processing Complete - Summary"))
self.stdout.write("=" * 70 + "\n")
# Display statistics
self.stdout.write("Statistics:")
self.stdout.write(f" • Documents processed: {results['processed']}")
self.stdout.write(f" • Documents with suggestions: {len(results['documents_with_suggestions'])}")
self.stdout.write(f" • Total suggestions generated: {results['suggestions_generated']}")
if options["auto_apply_high_confidence"] and not options["dry_run"]:
self.stdout.write(
self.style.SUCCESS(f" • Suggestions auto-applied: {results['auto_applied']}"),
)
if results["errors"] > 0:
self.stdout.write(
self.style.ERROR(f" • Errors encountered: {results['errors']}"),
)
# Display sample suggestions
if results["documents_with_suggestions"]:
self.stdout.write("\n" + "-" * 70)
self.stdout.write("Sample Suggestions (first 5 documents):\n")
for doc_info in results["documents_with_suggestions"][:5]:
self._display_document_suggestions(doc_info, options)
# Display errors
if results["error_documents"]:
self.stdout.write("\n" + "-" * 70)
self.stdout.write(self.style.ERROR("Errors:\n"))
for error_info in results["error_documents"][:10]:
self.stdout.write(
f" • Document {error_info['id']}: {error_info['title']}",
)
self.stdout.write(f" Error: {error_info['error']}")
# Final message
self.stdout.write("\n" + "=" * 70)
if options["dry_run"]:
self.stdout.write(
self.style.WARNING(
"DRY RUN completed - No changes were applied to documents.",
),
)
elif options["auto_apply_high_confidence"]:
self.stdout.write(
self.style.SUCCESS(
f"Processing complete - {results['auto_applied']} high confidence "
"suggestions were automatically applied.",
),
)
else:
self.stdout.write(
self.style.SUCCESS(
"Processing complete - Suggestions generated. Use "
"--auto-apply-high-confidence to apply them automatically.",
),
)
self.stdout.write("=" * 70 + "\n")
def _display_document_suggestions(self, doc_info: dict[str, Any], options):
"""Display suggestions for a single document."""
from documents.models import Correspondent
from documents.models import DocumentType
from documents.models import StoragePath
self.stdout.write(
f"\n Document #{doc_info['id']}: {doc_info['title']}",
)
suggestions = doc_info["suggestions"]
# Tags
if suggestions.get("tags"):
self.stdout.write(" Tags:")
for tag_id, conf in suggestions["tags"][:3]: # Show first 3
try:
tag = Tag.objects.get(pk=tag_id)
self.stdout.write(
f"{tag.name} (confidence: {conf:.0%})",
)
except Tag.DoesNotExist:
pass
# Correspondent
if suggestions.get("correspondent"):
corr_id, conf = suggestions["correspondent"]
try:
correspondent = Correspondent.objects.get(pk=corr_id)
self.stdout.write(
f" Correspondent: {correspondent.name} (confidence: {conf:.0%})",
)
except Correspondent.DoesNotExist:
pass
# Document Type
if suggestions.get("document_type"):
type_id, conf = suggestions["document_type"]
try:
doc_type = DocumentType.objects.get(pk=type_id)
self.stdout.write(
f" Document Type: {doc_type.name} (confidence: {conf:.0%})",
)
except DocumentType.DoesNotExist:
pass
# Storage Path
if suggestions.get("storage_path"):
path_id, conf = suggestions["storage_path"]
try:
storage_path = StoragePath.objects.get(pk=path_id)
self.stdout.write(
f" Storage Path: {storage_path.name} (confidence: {conf:.0%})",
)
except StoragePath.DoesNotExist:
pass
# Title suggestion
if suggestions.get("title_suggestion"):
self.stdout.write(
f" Title: {suggestions['title_suggestion']}",
)
# Applied changes (if auto-apply was enabled)
if doc_info.get("applied"):
applied = doc_info["applied"].get("applied", {})
if any(applied.values()):
self.stdout.write(
self.style.SUCCESS(" ✓ Applied changes:"),
)
if applied.get("tags"):
tag_names = [t["name"] for t in applied["tags"]]
self.stdout.write(
f" • Tags: {', '.join(tag_names)}",
)
if applied.get("correspondent"):
self.stdout.write(
f" • Correspondent: {applied['correspondent']['name']}",
)
if applied.get("document_type"):
self.stdout.write(
f" • Type: {applied['document_type']['name']}",
)
if applied.get("storage_path"):
self.stdout.write(
f" • Path: {applied['storage_path']['name']}",
)

View file

@ -0,0 +1,442 @@
"""
Tests for the scan_documents_ai management command.
"""
from io import StringIO
from unittest import mock
from django.core.management import CommandError
from django.core.management import call_command
from django.test import TestCase
from django.test import override_settings
from django.utils import timezone
from documents.ai_scanner import AIScanResult
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import Tag
from documents.tests.utils import DirectoriesMixin
class TestScanDocumentsAICommand(DirectoriesMixin, TestCase):
"""Test cases for the scan_documents_ai management command."""
def setUp(self):
"""Set up test data."""
super().setUp()
# Create test document types
self.doc_type_invoice = DocumentType.objects.create(name="Invoice")
self.doc_type_receipt = DocumentType.objects.create(name="Receipt")
# Create test tags
self.tag_important = Tag.objects.create(name="Important")
self.tag_tax = Tag.objects.create(name="Tax")
# Create test correspondent
self.correspondent = Correspondent.objects.create(name="Test Company")
# Create test documents
self.doc1 = Document.objects.create(
title="Test Document 1",
content="This is a test invoice document with important information.",
mime_type="application/pdf",
checksum="ABC123",
)
self.doc2 = Document.objects.create(
title="Test Document 2",
content="This is another test receipt document.",
mime_type="application/pdf",
checksum="DEF456",
document_type=self.doc_type_receipt,
)
self.doc3 = Document.objects.create(
title="Test Document 3",
content="A third document for testing date ranges.",
mime_type="application/pdf",
checksum="GHI789",
created=timezone.now() - timezone.timedelta(days=365),
)
def test_command_requires_filter(self):
"""Test that command requires at least one filter option."""
with self.assertRaises(CommandError) as cm:
call_command("scan_documents_ai")
self.assertIn("at least one filter", str(cm.exception))
def test_command_all_flag(self):
"""Test command with --all flag."""
# Mock the AI scanner
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
# Create a mock scan result
mock_result = AIScanResult()
mock_result.tags = [(self.tag_important.id, 0.85)]
mock_instance.scan_document.return_value = mock_result
out = StringIO()
call_command(
"scan_documents_ai",
"--all",
"--dry-run",
"--no-progress-bar",
stdout=out,
)
output = out.getvalue()
self.assertIn("Processing Complete", output)
self.assertIn("Documents processed:", output)
def test_command_filter_by_type(self):
"""Test command with --filter-by-type option."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
mock_result = AIScanResult()
mock_instance.scan_document.return_value = mock_result
out = StringIO()
call_command(
"scan_documents_ai",
"--filter-by-type",
str(self.doc_type_receipt.id),
"--dry-run",
"--no-progress-bar",
stdout=out,
)
# Should only scan doc2 which has the receipt type
self.assertEqual(mock_instance.scan_document.call_count, 1)
def test_command_invalid_document_type(self):
"""Test command with invalid document type ID."""
with self.assertRaises(CommandError) as cm:
call_command(
"scan_documents_ai",
"--filter-by-type",
"99999",
"--dry-run",
)
self.assertIn("does not exist", str(cm.exception))
def test_command_date_range(self):
"""Test command with --date-range option."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
mock_result = AIScanResult()
mock_instance.scan_document.return_value = mock_result
# Test with a date range that includes recent documents
today = timezone.now().date()
yesterday = (timezone.now() - timezone.timedelta(days=1)).date()
out = StringIO()
call_command(
"scan_documents_ai",
"--date-range",
str(yesterday),
str(today),
"--dry-run",
"--no-progress-bar",
stdout=out,
)
# Should scan doc1 and doc2 (recent), not doc3 (old)
self.assertGreaterEqual(mock_instance.scan_document.call_count, 2)
def test_command_invalid_date_range(self):
"""Test command with invalid date range."""
with self.assertRaises(CommandError) as cm:
call_command(
"scan_documents_ai",
"--date-range",
"2024-12-31",
"2024-01-01", # End before start
"--dry-run",
)
self.assertIn("Start date must be before end date", str(cm.exception))
def test_command_invalid_date_format(self):
"""Test command with invalid date format."""
with self.assertRaises(CommandError) as cm:
call_command(
"scan_documents_ai",
"--date-range",
"01/01/2024", # Wrong format
"12/31/2024",
"--dry-run",
)
self.assertIn("Invalid date format", str(cm.exception))
def test_command_id_range(self):
"""Test command with --id-range option."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
mock_result = AIScanResult()
mock_instance.scan_document.return_value = mock_result
out = StringIO()
call_command(
"scan_documents_ai",
"--id-range",
str(self.doc1.id),
str(self.doc1.id),
"--dry-run",
"--no-progress-bar",
stdout=out,
)
# Should only scan doc1
self.assertEqual(mock_instance.scan_document.call_count, 1)
def test_command_confidence_threshold(self):
"""Test command with custom confidence threshold."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
# Create mock result with low confidence
mock_result = AIScanResult()
mock_result.tags = [(self.tag_important.id, 0.50)] # Low confidence
mock_instance.scan_document.return_value = mock_result
out = StringIO()
call_command(
"scan_documents_ai",
"--all",
"--dry-run",
"--confidence-threshold",
"0.40", # Lower threshold
"--no-progress-bar",
stdout=out,
)
output = out.getvalue()
# Should show suggestions with low confidence
self.assertIn("suggestions generated", output.lower())
def test_command_invalid_confidence_threshold(self):
"""Test command with invalid confidence threshold."""
with self.assertRaises(CommandError) as cm:
call_command(
"scan_documents_ai",
"--all",
"--confidence-threshold",
"1.5", # Invalid (> 1.0)
"--dry-run",
)
self.assertIn("between 0.0 and 1.0", str(cm.exception))
def test_command_auto_apply(self):
"""Test command with --auto-apply-high-confidence."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
# Create mock result with high confidence
mock_result = AIScanResult()
mock_result.tags = [(self.tag_important.id, 0.90)]
mock_instance.scan_document.return_value = mock_result
# Mock apply_scan_results
mock_instance.apply_scan_results.return_value = {
"applied": {
"tags": [{"id": self.tag_important.id, "name": "Important"}],
},
"suggestions": {},
}
out = StringIO()
call_command(
"scan_documents_ai",
"--all",
"--auto-apply-high-confidence",
"--no-progress-bar",
stdout=out,
)
# Should call apply_scan_results with auto_apply=True
self.assertTrue(mock_instance.apply_scan_results.called)
call_args = mock_instance.apply_scan_results.call_args
self.assertTrue(call_args[1]["auto_apply"])
def test_command_dry_run_does_not_apply(self):
"""Test that dry run mode does not apply changes."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
mock_result = AIScanResult()
mock_result.tags = [(self.tag_important.id, 0.90)]
mock_instance.scan_document.return_value = mock_result
out = StringIO()
call_command(
"scan_documents_ai",
"--all",
"--dry-run",
"--auto-apply-high-confidence", # Should be ignored
"--no-progress-bar",
stdout=out,
)
# Should not call apply_scan_results in dry-run mode
self.assertFalse(mock_instance.apply_scan_results.called)
output = out.getvalue()
self.assertIn("DRY RUN", output)
def test_command_handles_document_without_content(self):
"""Test that command handles documents without content gracefully."""
# Create document without content
doc_no_content = Document.objects.create(
title="No Content Doc",
content="", # Empty content
mime_type="application/pdf",
checksum="EMPTY123",
)
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
mock_result = AIScanResult()
mock_instance.scan_document.return_value = mock_result
out = StringIO()
call_command(
"scan_documents_ai",
"--id-range",
str(doc_no_content.id),
str(doc_no_content.id),
"--dry-run",
"--no-progress-bar",
stdout=out,
)
# Should not call scan_document for empty content
self.assertEqual(mock_instance.scan_document.call_count, 0)
def test_command_handles_scanner_error(self):
"""Test that command handles scanner errors gracefully."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
# Make scan_document raise an exception
mock_instance.scan_document.side_effect = Exception("Scanner error")
out = StringIO()
call_command(
"scan_documents_ai",
"--all",
"--dry-run",
"--no-progress-bar",
stdout=out,
)
output = out.getvalue()
# Should report errors
self.assertIn("Errors encountered:", output)
def test_command_batch_processing(self):
"""Test that command processes documents in batches."""
# Create more documents
for i in range(10):
Document.objects.create(
title=f"Batch Doc {i}",
content=f"Content {i}",
mime_type="application/pdf",
checksum=f"BATCH{i}",
)
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
mock_result = AIScanResult()
mock_instance.scan_document.return_value = mock_result
out = StringIO()
call_command(
"scan_documents_ai",
"--all",
"--dry-run",
"--batch-size",
"5",
"--no-progress-bar",
stdout=out,
)
# Should process all documents
self.assertGreaterEqual(mock_instance.scan_document.call_count, 10)
def test_command_displays_suggestions(self):
"""Test that command displays suggestions in output."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
# Create comprehensive scan result
mock_result = AIScanResult()
mock_result.tags = [(self.tag_important.id, 0.85)]
mock_result.correspondent = (self.correspondent.id, 0.80)
mock_result.document_type = (self.doc_type_invoice.id, 0.90)
mock_result.title_suggestion = "Suggested Title"
mock_instance.scan_document.return_value = mock_result
out = StringIO()
call_command(
"scan_documents_ai",
"--id-range",
str(self.doc1.id),
str(self.doc1.id),
"--dry-run",
"--no-progress-bar",
stdout=out,
)
output = out.getvalue()
# Should display various suggestion types
self.assertIn("Sample Suggestions", output)
self.assertIn("Tags:", output)
self.assertIn("Correspondent:", output)
self.assertIn("Document Type:", output)
@override_settings(PAPERLESS_ENABLE_AI_SCANNER=False)
def test_command_works_when_ai_disabled(self):
"""Test that command can run even if AI scanner is disabled in settings."""
with mock.patch("documents.management.commands.scan_documents_ai.get_ai_scanner") as mock_scanner:
mock_instance = mock.Mock()
mock_scanner.return_value = mock_instance
mock_result = AIScanResult()
mock_instance.scan_document.return_value = mock_result
out = StringIO()
# Should not raise an error
call_command(
"scan_documents_ai",
"--all",
"--dry-run",
"--no-progress-bar",
stdout=out,
)
output = out.getvalue()
self.assertIn("Processing Complete", output)