paperless-ngx/src/paperless_mail/tests/test_mail.py

878 lines
28 KiB
Python
Raw Normal View History

import dataclasses
import email.contentmanager
2021-01-18 22:23:53 +01:00
import os
import random
import uuid
2020-11-17 16:42:19 +01:00
from collections import namedtuple
from typing import ContextManager
from typing import List
from typing import Union
2020-11-17 16:42:19 +01:00
from unittest import mock
2020-12-01 14:31:36 +01:00
from django.core.management import call_command
from django.db import DatabaseError
2020-11-17 00:18:40 +01:00
from django.test import TestCase
2020-11-17 16:42:19 +01:00
from documents.models import Correspondent
2021-01-18 22:23:53 +01:00
from documents.tests.utils import DirectoriesMixin
from imap_tools import EmailAddress
from imap_tools import FolderInfo
from imap_tools import MailboxFolderSelectError
from imap_tools import MailMessage
from imap_tools import MailMessageFlags
2020-12-01 14:31:36 +01:00
from paperless_mail import tasks
from paperless_mail.mail import MailAccountHandler
from paperless_mail.mail import MailError
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
@dataclasses.dataclass
class _AttachmentDef(object):
filename: str = "a_file.pdf"
maintype: str = "application/pdf"
subtype: str = "pdf"
disposition: str = "attachment"
content: bytes = b"a PDF document"
class BogusFolderManager:
current_folder = "INBOX"
def set(self, new_folder):
if new_folder not in ["INBOX", "spam"]:
raise MailboxFolderSelectError(None, "uhm")
self.current_folder = new_folder
class BogusMailBox(ContextManager):
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __init__(self):
self.messages: List[MailMessage] = []
self.messages_spam: List[MailMessage] = []
self.folder = BogusFolderManager()
def login(self, username, password):
2022-02-27 15:26:41 +01:00
if not (username == "admin" and password == "secret"):
raise Exception()
2021-01-11 22:11:01 +01:00
def fetch(self, criteria, mark_seen, charset=""):
msg = self.messages
2022-02-27 15:26:41 +01:00
criteria = str(criteria).strip("()").split(" ")
2022-02-27 15:26:41 +01:00
if "UNSEEN" in criteria:
msg = filter(lambda m: not m.seen, msg)
2022-02-27 15:26:41 +01:00
if "SUBJECT" in criteria:
subject = criteria[criteria.index("SUBJECT") + 1].strip('"')
msg = filter(lambda m: subject in m.subject, msg)
2022-02-27 15:26:41 +01:00
if "BODY" in criteria:
body = criteria[criteria.index("BODY") + 1].strip('"')
msg = filter(lambda m: body in m.text, msg)
2022-02-27 15:26:41 +01:00
if "FROM" in criteria:
from_ = criteria[criteria.index("FROM") + 1].strip('"')
msg = filter(lambda m: from_ in m.from_, msg)
2022-02-27 15:26:41 +01:00
if "UNFLAGGED" in criteria:
msg = filter(lambda m: not m.flagged, msg)
return list(msg)
def delete(self, uid_list):
self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages))
def flag(self, uid_list, flag_set, value):
for message in self.messages:
if message.uid in uid_list:
for flag in flag_set:
if flag == MailMessageFlags.FLAGGED:
message.flagged = value
if flag == MailMessageFlags.SEEN:
message.seen = value
def move(self, uid_list, folder):
if folder == "spam":
self.messages_spam += list(
filter(lambda m: m.uid in uid_list, self.messages),
)
2022-02-27 15:26:41 +01:00
self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages))
else:
raise Exception()
_used_uids = set()
2022-02-27 15:26:41 +01:00
def create_message(
attachments: Union[int, List[_AttachmentDef]] = 1,
body: str = "",
subject: str = "the suject",
from_: str = "noone@mail.com",
seen: bool = False,
flagged: bool = False,
) -> MailMessage:
email_msg = email.message.EmailMessage()
# TODO: This does NOT set the UID
email_msg["Message-ID"] = str(uuid.uuid4())
email_msg["Subject"] = subject
email_msg["From"] = from_
email_msg.set_content(body)
# Either add some default number of attachments
# or the provided attachments
if isinstance(attachments, int):
for i in range(attachments):
attachment = _AttachmentDef(filename=f"file_{i}.pdf")
email_msg.add_attachment(
attachment.content,
maintype=attachment.maintype,
subtype=attachment.subtype,
disposition=attachment.disposition,
filename=attachment.filename,
)
else:
for attachment in attachments:
email_msg.add_attachment(
attachment.content,
maintype=attachment.maintype,
subtype=attachment.subtype,
disposition=attachment.disposition,
filename=attachment.filename,
)
# Convert the EmailMessage to an imap_tools MailMessage
imap_msg = MailMessage.from_bytes(email_msg.as_bytes())
# TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in,
# based on how imap_tools uses regex to extract it.
# This should be a large enough pool
uid = random.randint(1, 10000)
while uid in _used_uids:
uid = random.randint(1, 10000)
_used_uids.add(uid)
imap_msg._raw_uid_data = f"UID {uid}".encode()
imap_msg.seen = seen
imap_msg.flagged = flagged
return imap_msg
def fake_magic_from_buffer(buffer, mime=False):
if mime:
2022-02-27 15:26:41 +01:00
if "PDF" in str(buffer):
return "application/pdf"
else:
2022-02-27 15:26:41 +01:00
return "unknown/type"
else:
2022-02-27 15:26:41 +01:00
return "Some verbose file description"
2022-02-27 15:26:41 +01:00
@mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer)
2021-01-18 22:23:53 +01:00
class TestMail(DirectoriesMixin, TestCase):
def setUp(self):
2022-02-27 15:26:41 +01:00
patcher = mock.patch("paperless_mail.mail.MailBox")
m = patcher.start()
self.bogus_mailbox = BogusMailBox()
m.return_value = self.bogus_mailbox
self.addCleanup(patcher.stop)
2022-02-27 15:26:41 +01:00
patcher = mock.patch("paperless_mail.mail.async_task")
self.async_task = patcher.start()
self.addCleanup(patcher.stop)
self.reset_bogus_mailbox()
self.mail_account_handler = MailAccountHandler()
2021-01-18 22:23:53 +01:00
super(TestMail, self).setUp()
def reset_bogus_mailbox(self):
self.bogus_mailbox.messages = []
self.bogus_mailbox.messages_spam = []
2022-02-27 15:26:41 +01:00
self.bogus_mailbox.messages.append(
create_message(
subject="Invoice 1",
from_="amazon@amazon.de",
body="cables",
seen=True,
flagged=False,
),
2022-02-27 15:26:41 +01:00
)
self.bogus_mailbox.messages.append(
create_message(
subject="Invoice 2",
body="from my favorite electronic store",
seen=False,
flagged=True,
),
2022-02-27 15:26:41 +01:00
)
self.bogus_mailbox.messages.append(
create_message(
subject="Claim your $10M price now!",
from_="amazon@amazon-some-indian-site.org",
seen=False,
),
2022-02-27 15:26:41 +01:00
)
2020-11-17 16:42:19 +01:00
def test_get_correspondent(self):
2022-02-27 15:26:41 +01:00
message = namedtuple("MailMessage", [])
2020-11-17 16:42:19 +01:00
message.from_ = "someone@somewhere.com"
message.from_values = EmailAddress(
"Someone!",
"someone@somewhere.com",
"Someone! <someone@somewhere.com>",
)
2020-11-17 16:42:19 +01:00
2022-02-27 15:26:41 +01:00
message2 = namedtuple("MailMessage", [])
2020-11-17 16:42:19 +01:00
message2.from_ = "me@localhost.com"
message2.from_values = EmailAddress(
"",
"fake@localhost.com",
"",
)
2020-11-17 16:42:19 +01:00
me_localhost = Correspondent.objects.create(name=message2.from_)
someone_else = Correspondent.objects.create(name="someone else")
handler = MailAccountHandler()
2022-02-27 15:26:41 +01:00
rule = MailRule(
name="a",
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
2022-02-27 15:26:41 +01:00
)
self.assertIsNone(handler.get_correspondent(message, rule))
2020-11-17 16:42:19 +01:00
2022-02-27 15:26:41 +01:00
rule = MailRule(
name="b",
assign_correspondent_from=MailRule.CorrespondentSource.FROM_EMAIL,
2022-02-27 15:26:41 +01:00
)
c = handler.get_correspondent(message, rule)
2020-11-17 16:42:19 +01:00
self.assertIsNotNone(c)
self.assertEqual(c.name, "someone@somewhere.com")
c = handler.get_correspondent(message2, rule)
2020-11-17 16:42:19 +01:00
self.assertIsNotNone(c)
self.assertEqual(c.name, "me@localhost.com")
self.assertEqual(c.id, me_localhost.id)
2022-02-27 15:26:41 +01:00
rule = MailRule(
name="c",
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NAME,
2022-02-27 15:26:41 +01:00
)
c = handler.get_correspondent(message, rule)
2020-11-17 16:42:19 +01:00
self.assertIsNotNone(c)
self.assertEqual(c.name, "Someone!")
c = handler.get_correspondent(message2, rule)
2020-11-17 16:42:19 +01:00
self.assertIsNotNone(c)
self.assertEqual(c.id, me_localhost.id)
2022-02-27 15:26:41 +01:00
rule = MailRule(
name="d",
assign_correspondent_from=MailRule.CorrespondentSource.FROM_CUSTOM,
2022-02-27 15:26:41 +01:00
assign_correspondent=someone_else,
)
c = handler.get_correspondent(message, rule)
2020-11-17 16:42:19 +01:00
self.assertEqual(c, someone_else)
def test_get_title(self):
2022-02-27 15:26:41 +01:00
message = namedtuple("MailMessage", [])
2020-11-17 16:42:19 +01:00
message.subject = "the message title"
2022-02-27 15:26:41 +01:00
att = namedtuple("Attachment", [])
2020-11-17 16:42:19 +01:00
att.filename = "this_is_the_file.pdf"
handler = MailAccountHandler()
rule = MailRule(
name="a",
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
)
self.assertEqual(handler.get_title(message, att, rule), "this_is_the_file")
rule = MailRule(
name="b",
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
)
self.assertEqual(handler.get_title(message, att, rule), "the message title")
2020-11-17 16:42:19 +01:00
def test_handle_message(self):
2022-02-27 15:26:41 +01:00
message = create_message(
subject="the message title",
from_="Myself",
attachments=2,
2022-02-27 15:26:41 +01:00
)
2020-11-17 16:42:19 +01:00
account = MailAccount()
account.save()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
)
rule.save()
2020-11-17 16:42:19 +01:00
result = self.mail_account_handler.handle_message(message, rule)
2020-11-17 17:00:26 +01:00
self.assertEqual(result, 2)
self.assertEqual(len(self.async_task.call_args_list), 2)
2020-11-17 17:00:26 +01:00
args1, kwargs1 = self.async_task.call_args_list[0]
args2, kwargs2 = self.async_task.call_args_list[1]
2020-11-17 17:00:26 +01:00
2022-02-27 15:26:41 +01:00
self.assertTrue(os.path.isfile(kwargs1["path"]), kwargs1["path"])
2021-01-18 22:23:53 +01:00
2022-02-27 15:26:41 +01:00
self.assertEqual(kwargs1["override_title"], "file_0")
self.assertEqual(kwargs1["override_filename"], "file_0.pdf")
2020-11-17 17:00:26 +01:00
2022-02-27 15:26:41 +01:00
self.assertTrue(os.path.isfile(kwargs2["path"]), kwargs1["path"])
2021-01-18 22:23:53 +01:00
2022-02-27 15:26:41 +01:00
self.assertEqual(kwargs2["override_title"], "file_1")
self.assertEqual(kwargs2["override_filename"], "file_1.pdf")
2020-11-17 17:00:26 +01:00
def test_handle_empty_message(self):
2022-02-27 15:26:41 +01:00
message = namedtuple("MailMessage", [])
2020-11-17 17:00:26 +01:00
message.attachments = []
rule = MailRule()
result = self.mail_account_handler.handle_message(message, rule)
2020-11-17 17:00:26 +01:00
self.assertFalse(self.async_task.called)
2020-11-17 17:00:26 +01:00
self.assertEqual(result, 0)
def test_handle_unknown_mime_type(self):
message = create_message(
attachments=[
_AttachmentDef(filename="f1.pdf"),
_AttachmentDef(
filename="f2.json",
content=b"{'much': 'payload.', 'so': 'json', 'wow': true}",
),
],
)
account = MailAccount()
account.save()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
self.assertEqual(result, 1)
self.assertEqual(self.async_task.call_count, 1)
args, kwargs = self.async_task.call_args
2022-02-27 15:26:41 +01:00
self.assertTrue(os.path.isfile(kwargs["path"]), kwargs["path"])
self.assertEqual(kwargs["override_filename"], "f1.pdf")
def test_handle_disposition(self):
message = create_message(
attachments=[
_AttachmentDef(
filename="f1.pdf",
disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
account = MailAccount()
account.save()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
self.assertEqual(result, 1)
self.assertEqual(self.async_task.call_count, 1)
args, kwargs = self.async_task.call_args
2022-02-27 15:26:41 +01:00
self.assertEqual(kwargs["override_filename"], "f2.pdf")
def test_handle_inline_files(self):
message = create_message(
attachments=[
_AttachmentDef(
filename="f1.pdf",
disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
account = MailAccount()
account.save()
2022-02-27 15:26:41 +01:00
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
2022-02-27 15:26:41 +01:00
account=account,
attachment_type=MailRule.AttachmentProcessing.EVERYTHING,
2022-02-27 15:26:41 +01:00
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
self.assertEqual(result, 2)
self.assertEqual(self.async_task.call_count, 2)
def test_filename_filter(self):
message = create_message(
attachments=[
_AttachmentDef(filename="f1.pdf"),
_AttachmentDef(filename="f2.pdf"),
_AttachmentDef(filename="f3.pdf"),
_AttachmentDef(filename="f2.png"),
_AttachmentDef(filename="file.PDf"),
_AttachmentDef(filename="f1.Pdf"),
],
)
tests = [
("*.pdf", ["f1.pdf", "f1.Pdf", "f2.pdf", "f3.pdf", "file.PDf"]),
("f1.pdf", ["f1.pdf", "f1.Pdf"]),
("f1", []),
("*", ["f1.pdf", "f2.pdf", "f3.pdf", "f2.png", "f1.Pdf", "file.PDf"]),
("*.png", ["f2.png"]),
]
for (pattern, matches) in tests:
matches.sort()
self.async_task.reset_mock()
account = MailAccount(name=str(uuid.uuid4()))
account.save()
2022-02-27 15:26:41 +01:00
rule = MailRule(
name=str(uuid.uuid4()),
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
2022-02-27 15:26:41 +01:00
account=account,
filter_attachment_filename=pattern,
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
self.assertEqual(result, len(matches), f"Error with pattern: {pattern}")
filenames = sorted(
[a[1]["override_filename"] for a in self.async_task.call_args_list],
)
self.assertListEqual(filenames, matches)
def test_handle_mail_account_mark_read(self):
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
_ = MailRule.objects.create(
name="testrule",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.MARK_READ,
2022-02-27 15:26:41 +01:00
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 2)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
def test_handle_mail_account_delete(self):
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
_ = MailRule.objects.create(
2022-02-27 15:26:41 +01:00
name="testrule",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.DELETE,
2022-02-27 15:26:41 +01:00
filter_subject="Invoice",
)
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 2)
self.assertEqual(len(self.bogus_mailbox.messages), 1)
def test_handle_mail_account_flag(self):
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
_ = MailRule.objects.create(
2022-02-27 15:26:41 +01:00
name="testrule",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.FLAG,
2022-02-27 15:26:41 +01:00
filter_subject="Invoice",
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 1)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
def test_handle_mail_account_move(self):
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
_ = MailRule.objects.create(
2022-02-27 15:26:41 +01:00
name="testrule",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.MOVE,
2022-02-27 15:26:41 +01:00
action_parameter="spam",
filter_subject="Claim",
)
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
def test_error_login(self):
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
password="wrong",
2022-02-27 15:26:41 +01:00
)
with self.assertRaises(MailError) as context:
self.mail_account_handler.handle_mail_account(account)
self.assertTrue(
str(context).startswith("Error while authenticating account"),
)
def test_error_skip_account(self):
_ = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
password="wroasdng",
2022-02-27 15:26:41 +01:00
)
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test2",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
_ = MailRule.objects.create(
2022-02-27 15:26:41 +01:00
name="testrule",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.MOVE,
2022-02-27 15:26:41 +01:00
action_parameter="spam",
filter_subject="Claim",
)
tasks.process_mail_accounts()
self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
def test_error_skip_rule(self):
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test2",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
_ = MailRule.objects.create(
2022-02-27 15:26:41 +01:00
name="testrule",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.MOVE,
2022-02-27 15:26:41 +01:00
action_parameter="spam",
filter_subject="Claim",
order=1,
folder="uuuhhhh",
)
_ = MailRule.objects.create(
2022-02-27 15:26:41 +01:00
name="testrule2",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.MOVE,
2022-02-27 15:26:41 +01:00
action_parameter="spam",
filter_subject="Claim",
order=2,
)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
def test_error_folder_set(self):
"""
GIVEN:
- Mail rule with non-existent folder
THEN:
- Should call list to output all folders in the account
- Should not process any messages
"""
account = MailAccount.objects.create(
name="test2",
imap_server="",
username="admin",
password="secret",
)
_ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.AttachmentAction.MOVE,
action_parameter="spam",
filter_subject="Claim",
order=1,
folder="uuuhhhh", # Invalid folder name
)
self.bogus_mailbox.folder.list = mock.Mock(
return_value=[FolderInfo("SomeFoldername", "|", ())],
)
self.mail_account_handler.handle_mail_account(account)
self.bogus_mailbox.folder.list.assert_called_once()
self.assertEqual(self.async_task.call_count, 0)
def test_error_folder_set_error_listing(self):
"""
GIVEN:
- Mail rule with non-existent folder
- Mail account folder listing raises exception
THEN:
- Should not process any messages
"""
account = MailAccount.objects.create(
name="test2",
imap_server="",
username="admin",
password="secret",
)
_ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.AttachmentAction.MOVE,
action_parameter="spam",
filter_subject="Claim",
order=1,
folder="uuuhhhh", # Invalid folder name
)
self.bogus_mailbox.folder.list = mock.Mock(
side_effect=MailboxFolderSelectError(None, "uhm"),
)
self.mail_account_handler.handle_mail_account(account)
self.bogus_mailbox.folder.list.assert_called_once()
self.assertEqual(self.async_task.call_count, 0)
@mock.patch("paperless_mail.mail.MailAccountHandler.get_correspondent")
def test_error_skip_mail(self, m):
def get_correspondent_fake(message, rule):
2022-02-27 15:26:41 +01:00
if message.from_ == "amazon@amazon.de":
raise ValueError("Does not compute.")
else:
return None
m.side_effect = get_correspondent_fake
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test2",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
_ = MailRule.objects.create(
2022-02-27 15:26:41 +01:00
name="testrule",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.MOVE,
2022-02-27 15:26:41 +01:00
action_parameter="spam",
)
self.mail_account_handler.handle_mail_account(account)
# test that we still consume mail even if some mails throw errors.
self.assertEqual(self.async_task.call_count, 2)
# faulty mail still in inbox, untouched
self.assertEqual(len(self.bogus_mailbox.messages), 1)
2022-02-27 15:26:41 +01:00
self.assertEqual(self.bogus_mailbox.messages[0].from_, "amazon@amazon.de")
2020-12-04 15:56:26 +01:00
def test_error_create_correspondent(self):
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test2",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
_ = MailRule.objects.create(
2022-02-27 15:26:41 +01:00
name="testrule",
filter_from="amazon@amazon.de",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.MOVE,
2022-02-27 15:26:41 +01:00
action_parameter="spam",
assign_correspondent_from=MailRule.CorrespondentSource.FROM_EMAIL,
2022-02-27 15:26:41 +01:00
)
2020-12-04 15:56:26 +01:00
self.mail_account_handler.handle_mail_account(account)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
c = Correspondent.objects.get(name="amazon@amazon.de")
# should work
2022-02-27 15:26:41 +01:00
self.assertEqual(kwargs["override_correspondent_id"], c.id)
2020-12-04 15:56:26 +01:00
self.async_task.reset_mock()
self.reset_bogus_mailbox()
with mock.patch("paperless_mail.mail.Correspondent.objects.get_or_create") as m:
m.side_effect = DatabaseError()
self.mail_account_handler.handle_mail_account(account)
args, kwargs = self.async_task.call_args
self.async_task.assert_called_once()
2022-02-27 15:26:41 +01:00
self.assertEqual(kwargs["override_correspondent_id"], None)
def test_filters(self):
2022-02-27 15:26:41 +01:00
account = MailAccount.objects.create(
name="test3",
imap_server="",
username="admin",
password="secret",
2022-02-27 15:26:41 +01:00
)
rule = MailRule.objects.create(
name="testrule3",
account=account,
2022-04-19 00:49:41 +02:00
action=MailRule.MailAction.DELETE,
2022-02-27 15:26:41 +01:00
filter_subject="Claim",
)
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(self.async_task.call_count, 1)
self.reset_bogus_mailbox()
rule.filter_subject = None
rule.filter_body = "electronic"
rule.save()
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(self.async_task.call_count, 2)
self.reset_bogus_mailbox()
rule.filter_from = "amazon"
rule.filter_body = None
rule.save()
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(len(self.bogus_mailbox.messages), 1)
self.assertEqual(self.async_task.call_count, 4)
self.reset_bogus_mailbox()
rule.filter_from = "amazon"
rule.filter_body = "cables"
rule.filter_subject = "Invoice"
rule.save()
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(self.async_task.call_count, 5)
2020-12-01 14:31:36 +01:00
2022-02-27 15:26:41 +01:00
class TestManagementCommand(TestCase):
@mock.patch(
"paperless_mail.management.commands.mail_fetcher.tasks.process_mail_accounts",
2022-02-27 15:26:41 +01:00
)
2020-12-01 14:31:36 +01:00
def test_mail_fetcher(self, m):
call_command("mail_fetcher")
m.assert_called_once()
2022-02-27 15:26:41 +01:00
class TestTasks(TestCase):
2020-12-01 14:31:36 +01:00
@mock.patch("paperless_mail.tasks.MailAccountHandler.handle_mail_account")
def test_all_accounts(self, m):
m.side_effect = lambda account: 6
2022-02-27 15:26:41 +01:00
MailAccount.objects.create(
name="A",
imap_server="A",
username="A",
password="A",
2022-02-27 15:26:41 +01:00
)
MailAccount.objects.create(
name="B",
imap_server="A",
username="A",
password="A",
2022-02-27 15:26:41 +01:00
)
2020-12-01 14:31:36 +01:00
result = tasks.process_mail_accounts()
self.assertEqual(m.call_count, 2)
self.assertIn("Added 12", result)
m.side_effect = lambda account: 0
result = tasks.process_mail_accounts()
self.assertIn("No new", result)
@mock.patch("paperless_mail.tasks.MailAccountHandler.handle_mail_account")
def test_single_accounts(self, m):
2022-02-27 15:26:41 +01:00
MailAccount.objects.create(
name="A",
imap_server="A",
username="A",
password="A",
2022-02-27 15:26:41 +01:00
)
2020-12-01 14:31:36 +01:00
tasks.process_mail_account("A")
m.assert_called_once()
m.reset_mock()
tasks.process_mail_account("B")
m.assert_not_called()