paperless-ngx/src/documents/management/commands/document_consumer.py

148 lines
4.7 KiB
Python
Raw Normal View History

2015-12-26 13:21:33 +00:00
import datetime
import logging
2015-12-20 19:23:33 +00:00
import os
2018-05-11 14:01:21 +02:00
import sys
import time
2015-12-20 19:23:33 +00:00
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
2015-12-20 19:23:33 +00:00
2018-05-21 00:35:34 +02:00
from ...consumer import Consumer, ConsumerError
from ...mail import MailFetcher, MailFetcherError
2016-01-23 02:33:29 +00:00
2018-05-11 14:01:21 +02:00
try:
from inotify_simple import INotify, flags
except ImportError:
pass
2016-01-23 02:33:29 +00:00
class Command(BaseCommand):
2015-12-20 19:23:33 +00:00
"""
On every iteration of an infinite loop, consume what we can from the
consumption directory, and fetch any mail available.
2015-12-20 19:23:33 +00:00
"""
ORIGINAL_DOCS = os.path.join(settings.MEDIA_ROOT, "documents", "originals")
THUMB_DOCS = os.path.join(settings.MEDIA_ROOT, "documents", "thumbnails")
2015-12-20 19:23:33 +00:00
def __init__(self, *args, **kwargs):
2016-01-21 12:50:22 -05:00
2015-12-20 19:23:33 +00:00
self.verbosity = 0
self.file_consumer = None
self.mail_fetcher = None
self.first_iteration = True
2016-01-21 12:50:22 -05:00
2015-12-20 19:23:33 +00:00
BaseCommand.__init__(self, *args, **kwargs)
def add_arguments(self, parser):
2018-02-25 19:20:51 +01:00
parser.add_argument(
"directory",
default=settings.CONSUMPTION_DIR,
2018-02-26 18:52:46 +01:00
nargs="?",
help="The consumption directory."
2018-02-25 19:20:51 +01:00
)
parser.add_argument(
"--loop-time",
default=settings.CONSUMER_LOOP_TIME,
2018-02-26 18:52:46 +01:00
type=int,
help="Wait time between each loop (in seconds)."
)
parser.add_argument(
"--mail-delta",
default=10,
type=int,
help="Wait time between each mail fetch (in minutes)."
)
parser.add_argument(
"--oneshot",
action="store_true",
help="Run only once."
2018-02-25 19:20:51 +01:00
)
2018-05-11 14:01:21 +02:00
parser.add_argument(
"--no-inotify",
action="store_true",
help="Don't use inotify, even if it's available."
)
2015-12-20 19:23:33 +00:00
def handle(self, *args, **options):
self.verbosity = options["verbosity"]
2018-02-25 19:20:51 +01:00
directory = options["directory"]
loop_time = options["loop_time"]
mail_delta = options["mail_delta"] * 60
2018-05-11 14:01:21 +02:00
use_inotify = (not options["no_inotify"]
and "inotify_simple" in sys.modules)
2015-12-20 19:23:33 +00:00
try:
self.file_consumer = Consumer(consume=directory)
self.mail_fetcher = MailFetcher(consume=directory)
except (ConsumerError, MailFetcherError) as e:
raise CommandError(e)
2018-05-21 00:35:34 +02:00
for d in (self.ORIGINAL_DOCS, self.THUMB_DOCS):
os.makedirs(d, exists_ok=True)
2015-12-20 19:23:33 +00:00
logging.getLogger(__name__).info(
2018-05-11 14:01:21 +02:00
"Starting document consumer at {}{}".format(
directory,
" with inotify" if use_inotify else ""
)
)
2018-02-25 19:20:51 +01:00
if options["oneshot"]:
self.loop_step(mail_delta)
else:
try:
2018-05-11 14:01:21 +02:00
if use_inotify:
self.loop_inotify(mail_delta)
else:
self.loop(loop_time, mail_delta)
except KeyboardInterrupt:
print("Exiting")
def loop(self, loop_time, mail_delta):
while True:
start_time = time.time()
if self.verbosity > 1:
print(".", int(start_time))
self.loop_step(mail_delta, start_time)
# Sleep until the start of the next loop step
time.sleep(max(0, start_time + loop_time - time.time()))
def loop_step(self, mail_delta, time_now=None):
2016-01-01 16:13:59 +00:00
# Occasionally fetch mail and store it to be consumed on the next loop
# We fetch email when we first start up so that it is not necessary to
# wait for 10 minutes after making changes to the config file.
next_mail_time = self.mail_fetcher.last_checked + mail_delta
if self.first_iteration or time_now > next_mail_time:
self.first_iteration = False
self.mail_fetcher.pull()
self.file_consumer.consume_new_files()
2018-05-11 14:01:21 +02:00
def loop_inotify(self, mail_delta):
directory = self.file_consumer.consume
inotify = INotify()
inotify.add_watch(directory, flags.CLOSE_WRITE | flags.MOVED_TO)
# Run initial mail fetch and consume all currently existing documents
self.loop_step(mail_delta)
next_mail_time = self.mail_fetcher.last_checked + mail_delta
while True:
# Consume documents until next_mail_time
while True:
delta = next_mail_time - time.time()
if delta > 0:
for event in inotify.read(timeout=delta):
file = os.path.join(directory, event.name)
if os.path.isfile(file):
self.file_consumer.try_consume_file(file)
else:
break
self.mail_fetcher.pull()
next_mail_time = self.mail_fetcher.last_checked + mail_delta