paperless-ngx/src/documents/consumer.py

398 lines
14 KiB
Python
Raw Normal View History

import datetime
import hashlib
import os
import uuid
2021-01-01 23:27:55 +01:00
from subprocess import Popen
2020-11-20 13:31:03 +01:00
import magic
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings
2020-11-02 18:20:04 +01:00
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
2020-12-08 13:54:35 +01:00
from filelock import FileLock
2021-01-01 23:27:55 +01:00
from rest_framework.reverse import reverse
2020-11-02 18:20:04 +01:00
from .classifier import load_classifier
2020-12-08 13:54:35 +01:00
from .file_handling import create_source_path_directory, \
generate_unique_filename
from .loggers import LoggingMixin
2020-11-16 18:26:54 +01:00
from .models import Document, FileInfo, Correspondent, DocumentType, Tag
2020-12-08 13:54:35 +01:00
from .parsers import ParseError, get_parser_class_for_mime_type, parse_date
from .signals import (
document_consumption_finished,
document_consumption_started
)
class ConsumerError(Exception):
pass
2021-01-28 22:06:02 +01:00
MESSAGE_DOCUMENT_ALREADY_EXISTS = "document_already_exists"
MESSAGE_FILE_NOT_FOUND = "file_not_found"
MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND = "pre_consume_script_not_found"
MESSAGE_PRE_CONSUME_SCRIPT_ERROR = "pre_consume_script_error"
MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND = "post_consume_script_not_found"
MESSAGE_POST_CONSUME_SCRIPT_ERROR = "post_consume_script_error"
MESSAGE_NEW_FILE = "new_file"
MESSAGE_UNSUPPORTED_TYPE = "unsupported_type"
MESSAGE_PARSING_DOCUMENT = "parsing_document"
MESSAGE_GENERATING_THUMBNAIL = "generating_thumbnail"
MESSAGE_PARSE_DATE = "parse_date"
MESSAGE_SAVE_DOCUMENT = "save_document"
MESSAGE_FINISHED = "finished"
class Consumer(LoggingMixin):
2021-02-05 01:10:29 +01:00
logging_name = "paperless.consumer"
2021-01-26 15:02:46 +01:00
def _send_progress(self, current_progress, max_progress, status,
2021-01-28 22:06:02 +01:00
message=None, document_id=None):
payload = {
2021-01-26 15:19:56 +01:00
'filename': os.path.basename(self.filename) if self.filename else None, # NOQA: E501
2021-01-26 00:51:20 +01:00
'task_id': self.task_id,
'current_progress': current_progress,
'max_progress': max_progress,
'status': status,
'message': message,
'document_id': document_id
}
2020-11-19 22:10:57 +01:00
async_to_sync(self.channel_layer.group_send)("status_updates",
{'type': 'status_update',
'data': payload})
2021-03-22 23:00:15 +01:00
def _fail(self, message, log_message=None, exc_info=None):
2021-01-26 15:02:46 +01:00
self._send_progress(100, 100, 'FAILED', message)
2021-03-22 23:00:15 +01:00
self.log("error", log_message or message, exc_info=exc_info)
2021-01-26 15:02:46 +01:00
raise ConsumerError(f"{self.filename}: {log_message or message}")
2020-12-07 12:44:23 +01:00
2020-11-16 18:26:54 +01:00
def __init__(self):
super().__init__()
2020-11-17 11:49:44 +01:00
self.path = None
self.filename = None
self.override_title = None
self.override_correspondent_id = None
self.override_tag_ids = None
self.override_document_type_id = None
2021-01-26 00:51:20 +01:00
self.task_id = None
self.channel_layer = get_channel_layer()
2020-11-17 11:49:44 +01:00
def pre_check_file_exists(self):
if not os.path.isfile(self.path):
2021-01-26 15:02:46 +01:00
self._fail(
2021-01-28 22:06:02 +01:00
MESSAGE_FILE_NOT_FOUND,
2021-01-26 15:19:56 +01:00
f"Cannot consume {self.path}: File not found."
2020-12-08 13:54:35 +01:00
)
2020-11-17 11:49:44 +01:00
def pre_check_duplicate(self):
with open(self.path, "rb") as f:
2020-11-16 18:26:54 +01:00
checksum = hashlib.md5(f.read()).hexdigest()
if Document.objects.filter(Q(checksum=checksum) | Q(archive_checksum=checksum)).exists(): # NOQA: E501
2020-11-16 18:26:54 +01:00
if settings.CONSUMER_DELETE_DUPLICATES:
2020-11-17 11:49:44 +01:00
os.unlink(self.path)
2021-01-26 15:02:46 +01:00
self._fail(
2021-01-28 22:06:02 +01:00
MESSAGE_DOCUMENT_ALREADY_EXISTS,
2021-01-26 15:02:46 +01:00
f"Not consuming {self.filename}: It is a duplicate."
2020-12-08 13:54:35 +01:00
)
2016-02-27 20:18:50 +00:00
2020-11-17 11:49:44 +01:00
def pre_check_directories(self):
2020-11-16 18:26:54 +01:00
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
2020-11-16 23:16:37 +01:00
os.makedirs(settings.THUMBNAIL_DIR, exist_ok=True)
os.makedirs(settings.ORIGINALS_DIR, exist_ok=True)
2020-11-25 14:45:21 +01:00
os.makedirs(settings.ARCHIVE_DIR, exist_ok=True)
2021-01-01 23:27:55 +01:00
def run_pre_consume_script(self):
if not settings.PRE_CONSUME_SCRIPT:
return
2021-01-06 14:08:44 +01:00
if not os.path.isfile(settings.PRE_CONSUME_SCRIPT):
2021-01-26 15:02:46 +01:00
self._fail(
2021-01-28 22:06:02 +01:00
MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND,
2021-01-06 14:08:44 +01:00
f"Configured pre-consume script "
f"{settings.PRE_CONSUME_SCRIPT} does not exist.")
2021-01-01 23:27:55 +01:00
try:
Popen((settings.PRE_CONSUME_SCRIPT, self.path)).wait()
except Exception as e:
2021-01-26 15:02:46 +01:00
self._fail(
2021-01-28 22:06:02 +01:00
MESSAGE_PRE_CONSUME_SCRIPT_ERROR,
2021-03-22 23:00:15 +01:00
f"Error while executing pre-consume script: {e}",
exc_info=True
2021-01-01 23:27:55 +01:00
)
def run_post_consume_script(self, document):
if not settings.POST_CONSUME_SCRIPT:
return
2021-01-06 14:08:44 +01:00
if not os.path.isfile(settings.POST_CONSUME_SCRIPT):
2021-01-26 15:02:46 +01:00
self._fail(
2021-01-28 22:06:02 +01:00
MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND,
2021-01-06 14:08:44 +01:00
f"Configured post-consume script "
2021-01-26 15:02:46 +01:00
f"{settings.POST_CONSUME_SCRIPT} does not exist."
)
2021-01-06 14:08:44 +01:00
2021-01-01 23:27:55 +01:00
try:
Popen((
settings.POST_CONSUME_SCRIPT,
str(document.pk),
document.get_public_filename(),
os.path.normpath(document.source_path),
os.path.normpath(document.thumbnail_path),
reverse("document-download", kwargs={"pk": document.pk}),
reverse("document-thumb", kwargs={"pk": document.pk}),
str(document.correspondent),
str(",".join(document.tags.all().values_list(
"name", flat=True)))
)).wait()
except Exception as e:
2021-01-26 15:02:46 +01:00
self._fail(
2021-01-28 22:06:02 +01:00
MESSAGE_POST_CONSUME_SCRIPT_ERROR,
2021-03-22 23:00:15 +01:00
f"Error while executing post-consume script: {e}",
exc_info=True
2021-01-01 23:27:55 +01:00
)
2020-11-16 18:26:54 +01:00
def try_consume_file(self,
2020-11-17 11:49:44 +01:00
path,
override_filename=None,
override_title=None,
override_correspondent_id=None,
override_document_type_id=None,
2021-01-26 00:51:20 +01:00
override_tag_ids=None,
task_id=None):
"""
2020-11-16 18:26:54 +01:00
Return the document object if it was successfully created.
"""
2020-11-17 11:49:44 +01:00
self.path = path
self.filename = override_filename or os.path.basename(path)
self.override_title = override_title
self.override_correspondent_id = override_correspondent_id
self.override_document_type_id = override_document_type_id
self.override_tag_ids = override_tag_ids
self.task_id = task_id or str(uuid.uuid4())
2021-01-28 22:06:02 +01:00
self._send_progress(0, 100, 'STARTING', MESSAGE_NEW_FILE)
2020-12-07 12:44:23 +01:00
2020-11-16 18:26:54 +01:00
# this is for grouping logging entries for this particular file
# together.
2016-02-27 20:18:50 +00:00
self.renew_logging_group()
2020-11-16 18:26:54 +01:00
# Make sure that preconditions for consuming the file are met.
2016-02-27 20:18:50 +00:00
2020-11-17 11:49:44 +01:00
self.pre_check_file_exists()
2020-11-16 23:16:37 +01:00
self.pre_check_directories()
2020-11-17 11:49:44 +01:00
self.pre_check_duplicate()
2020-12-23 15:22:28 +01:00
self.log("info", f"Consuming {self.filename}")
2020-11-16 18:26:54 +01:00
# Determine the parser class.
2020-11-20 13:31:03 +01:00
mime_type = magic.from_file(self.path, mime=True)
2020-12-23 15:22:28 +01:00
self.log("debug", f"Detected mime type: {mime_type}")
2020-11-20 13:31:03 +01:00
parser_class = get_parser_class_for_mime_type(mime_type)
if not parser_class:
2021-01-26 15:02:46 +01:00
self._fail(
2021-01-28 22:06:02 +01:00
MESSAGE_UNSUPPORTED_TYPE,
2021-01-26 15:02:46 +01:00
f"Unsupported mime type {mime_type}"
)
2020-11-16 18:26:54 +01:00
# Notify all listeners that we're going to do some work.
document_consumption_started.send(
sender=self.__class__,
2020-11-17 11:49:44 +01:00
filename=self.path,
logging_group=self.logging_group
)
2021-01-01 23:27:55 +01:00
self.run_pre_consume_script()
2021-01-28 22:06:02 +01:00
def progress_callback(current_progress, max_progress):
# recalculate progress to be within 20 and 80
2020-12-07 12:44:23 +01:00
p = int((current_progress / max_progress) * 50 + 20)
2021-01-28 22:06:02 +01:00
self._send_progress(p, 100, "WORKING")
2020-11-19 22:10:57 +01:00
2020-11-16 18:26:54 +01:00
# This doesn't parse the document yet, but gives us a parser.
document_parser = parser_class(self.logging_group, progress_callback)
2020-11-16 18:26:54 +01:00
2021-02-06 15:30:47 +01:00
self.log("debug", f"Parser: {type(document_parser).__name__}")
2020-11-16 18:26:54 +01:00
# However, this already created working directories which we have to
# clean up.
2020-11-16 18:26:54 +01:00
# Parse the document. This may take some time.
2021-01-26 15:02:46 +01:00
text = None
date = None
thumbnail = None
archive_path = None
try:
2021-01-28 22:06:02 +01:00
self._send_progress(20, 100, 'WORKING', MESSAGE_PARSING_DOCUMENT)
2020-11-17 11:49:44 +01:00
self.log("debug", "Parsing {}...".format(self.filename))
2021-01-01 22:19:43 +01:00
document_parser.parse(self.path, mime_type, self.filename)
2020-11-25 19:36:18 +01:00
2020-11-21 14:03:45 +01:00
self.log("debug", f"Generating thumbnail for {self.filename}...")
2021-01-26 15:02:46 +01:00
self._send_progress(70, 100, 'WORKING',
2021-01-28 22:06:02 +01:00
MESSAGE_GENERATING_THUMBNAIL)
2020-11-25 19:51:02 +01:00
thumbnail = document_parser.get_optimised_thumbnail(
self.path, mime_type, self.filename)
2020-11-25 19:36:18 +01:00
text = document_parser.get_text()
date = document_parser.get_date()
2020-11-25 19:36:18 +01:00
if not date:
2021-01-26 15:02:46 +01:00
self._send_progress(90, 100, 'WORKING',
2021-01-28 22:06:02 +01:00
MESSAGE_PARSE_DATE)
2020-11-25 19:36:18 +01:00
date = parse_date(self.filename, text)
2020-11-25 14:47:01 +01:00
archive_path = document_parser.get_archive_path()
2020-11-25 19:36:18 +01:00
except ParseError as e:
document_parser.cleanup()
2021-01-26 15:02:46 +01:00
self._fail(
str(e),
2021-03-22 23:00:15 +01:00
f"Error while consuming document {self.filename}: {e}",
exc_info=True
2021-01-26 15:02:46 +01:00
)
2020-11-16 18:26:54 +01:00
# Prepare the document classifier.
2020-11-16 18:26:54 +01:00
# TODO: I don't really like to do this here, but this way we avoid
# reloading the classifier multiple times, since there are multiple
# post-consume hooks that all require the classifier.
2016-10-26 09:52:09 +00:00
classifier = load_classifier()
2021-01-28 22:06:02 +01:00
self._send_progress(95, 100, 'WORKING', MESSAGE_SAVE_DOCUMENT)
2020-11-16 18:26:54 +01:00
# now that everything is done, we can start to store the document
# in the system. This will be a transaction and reasonably fast.
try:
with transaction.atomic():
# store the document.
document = self._store(
text=text,
2020-11-20 13:31:03 +01:00
date=date,
mime_type=mime_type
2020-11-16 18:26:54 +01:00
)
# If we get here, it was successful. Proceed with post-consume
# hooks. If they fail, nothing will get changed.
document_consumption_finished.send(
sender=self.__class__,
document=document,
logging_group=self.logging_group,
classifier=classifier
)
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
2020-12-08 13:54:35 +01:00
with FileLock(settings.MEDIA_LOCK):
document.filename = generate_unique_filename(document)
2020-12-08 13:54:35 +01:00
create_source_path_directory(document.source_path)
self._write(document.storage_type,
self.path, document.source_path)
2020-11-16 18:26:54 +01:00
2020-12-08 13:54:35 +01:00
self._write(document.storage_type,
thumbnail, document.thumbnail_path)
2020-11-25 14:47:01 +01:00
2020-12-08 13:54:35 +01:00
if archive_path and os.path.isfile(archive_path):
document.archive_filename = generate_unique_filename(
document,
archive_filename=True
)
2020-12-08 13:54:35 +01:00
create_source_path_directory(document.archive_path)
self._write(document.storage_type,
archive_path, document.archive_path)
2020-11-25 14:47:01 +01:00
2020-12-08 13:54:35 +01:00
with open(archive_path, 'rb') as f:
document.archive_checksum = hashlib.md5(
f.read()).hexdigest()
2020-11-25 14:47:01 +01:00
# Don't save with the lock active. Saving will cause the file
# renaming logic to aquire the lock as well.
document.save()
2020-11-16 18:26:54 +01:00
# Delete the file only if it was successfully consumed
2020-11-17 11:49:44 +01:00
self.log("debug", "Deleting file {}".format(self.path))
os.unlink(self.path)
2021-01-01 23:27:55 +01:00
2020-11-16 18:26:54 +01:00
except Exception as e:
2021-01-26 15:02:46 +01:00
self._fail(
str(e),
2020-11-29 12:37:11 +01:00
f"The following error occured while consuming "
2021-03-22 23:00:15 +01:00
f"{self.filename}: {e}",
exc_info=True
2020-11-29 12:37:11 +01:00
)
2020-11-16 18:26:54 +01:00
finally:
document_parser.cleanup()
self.run_post_consume_script(document)
2020-11-16 18:26:54 +01:00
self.log(
"info",
"Document {} consumption finished".format(document)
)
2021-01-28 22:06:02 +01:00
self._send_progress(100, 100, 'SUCCESS', MESSAGE_FINISHED, document.id)
2020-11-16 18:26:54 +01:00
return document
2020-11-20 13:31:03 +01:00
def _store(self, text, date, mime_type):
2020-11-16 18:26:54 +01:00
# If someone gave us the original filename, use it instead of doc.
file_info = FileInfo.from_filename(self.filename)
2020-11-17 11:49:44 +01:00
stats = os.stat(self.path)
2016-02-27 20:18:50 +00:00
self.log("debug", "Saving record to database")
created = file_info.created or date or timezone.make_aware(
2020-11-12 21:09:45 +01:00
datetime.datetime.fromtimestamp(stats.st_mtime))
storage_type = Document.STORAGE_TYPE_UNENCRYPTED
2020-11-17 11:49:44 +01:00
with open(self.path, "rb") as f:
2016-04-11 23:28:12 +01:00
document = Document.objects.create(
2020-12-09 22:16:57 +01:00
title=(self.override_title or file_info.title)[:127],
2016-04-11 23:28:12 +01:00
content=text,
2020-11-20 13:31:03 +01:00
mime_type=mime_type,
2016-04-11 23:28:12 +01:00
checksum=hashlib.md5(f.read()).hexdigest(),
created=created,
modified=created,
2020-11-17 11:49:44 +01:00
storage_type=storage_type
2016-04-11 23:28:12 +01:00
)
2020-11-17 11:49:44 +01:00
self.apply_overrides(document)
2020-12-09 22:16:57 +01:00
document.save()
return document
2020-11-17 11:49:44 +01:00
def apply_overrides(self, document):
if self.override_correspondent_id:
2020-11-21 14:03:45 +01:00
document.correspondent = Correspondent.objects.get(
pk=self.override_correspondent_id)
2020-11-17 11:49:44 +01:00
if self.override_document_type_id:
2020-11-21 14:03:45 +01:00
document.document_type = DocumentType.objects.get(
pk=self.override_document_type_id)
2020-11-17 11:49:44 +01:00
if self.override_tag_ids:
for tag_id in self.override_tag_ids:
document.tags.add(Tag.objects.get(pk=tag_id))
2020-11-25 14:47:01 +01:00
def _write(self, storage_type, source, target):
with open(source, "rb") as read_file:
with open(target, "wb") as write_file:
write_file.write(read_file.read())