diff --git a/CHANGELOG.md b/CHANGELOG.md index 7187fcd..f6242f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +6.5.0 +----- + +- Move mail processing functions to `mailsuite` package +- Properly set timeout when querying DNS (closes issue #79 and #92) + 6.4.2 ----- diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index ebba721..44da455 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -914,8 +914,21 @@ def get_dmarc_reports_from_inbox(connection=None, ssl=ssl, verify=verify, initial_folder=reports_folder) + server.create_folder(archive_folder) + server.create_folder(aggregate_reports_folder) + server.create_folder(forensic_reports_folder) + server.create_folder(invalid_reports_folder) + messages = server.search() - for msg_uid in messages: + total_messages = len(messages) + logger.debug("Found {0} messages in {1}".format(len(messages), + reports_folder)) + for i in range(len(messages)): + msg_uid = messages[i] + logger.debug("Processing message {0} of {1}: UID {2}".format( + i+1, total_messages, msg_uid + + )) msg_content = server.fetch_message(msg_uid, parse=False) sa = strip_attachment_payloads try: @@ -942,66 +955,66 @@ def get_dmarc_reports_from_inbox(connection=None, msg_uid, invalid_reports_folder)) server.move_messages([msg_uid], invalid_reports_folder) - if not test: - if delete: - processed_messages = aggregate_report_msg_uids + \ - forensic_report_msg_uids + if not test: + if delete: + processed_messages = aggregate_report_msg_uids + \ + forensic_report_msg_uids - number_of_processed_msgs = len(processed_messages) - for i in range(number_of_processed_msgs): - msg_uid = processed_messages[i] + number_of_processed_msgs = len(processed_messages) + for i in range(number_of_processed_msgs): + msg_uid = processed_messages[i] + logger.debug( + "Deleting message {0} of {1}: UID {2}".format( + i + 1, number_of_processed_msgs, msg_uid)) + try: + server.delete_messages([msg_uid]) + + except Exception as e: + message = "Error deleting message UID" + e = "{0} {1}: " "{2}".format(message, msg_uid, e) + logger.error("IMAP error: {0}".format(e)) + else: + if len(aggregate_report_msg_uids) > 0: + log_message = "Moving aggregate report messages from" + logger.debug( + "{0} {1} to {2}".format( + log_message, reports_folder, + aggregate_reports_folder)) + number_of_agg_report_msgs = len(aggregate_report_msg_uids) + for i in range(number_of_agg_report_msgs): + msg_uid = aggregate_report_msg_uids[i] logger.debug( - "Deleting message {0} of {1}: UID {2}".format( - i + 1, number_of_processed_msgs, msg_uid)) + "Moving message {0} of {1}: UID {2}".format( + i+1, number_of_agg_report_msgs, msg_uid)) try: - server.delete_messages([msg_uid]) - + server.move_messages([msg_uid], + aggregate_reports_folder) except Exception as e: - message = "Error deleting message UID" - e = "{0} {1}: " "{2}".format(message, msg_uid, e) + message = "Error moving message UID" + e = "{0} {1}: {2}".format(message, msg_uid, e) logger.error("IMAP error: {0}".format(e)) - else: - if len(aggregate_report_msg_uids) > 0: - log_message = "Moving aggregate report messages from" - logger.debug( - "{0} {1} to {2}".format( - log_message, reports_folder, - aggregate_reports_folder)) - number_of_agg_report_msgs = len(aggregate_report_msg_uids) - for i in range(number_of_agg_report_msgs): - msg_uid = aggregate_report_msg_uids[i] - logger.debug( - "Moving message {0} of {1}: UID {2}".format( - i+1, number_of_agg_report_msgs, msg_uid)) - try: - server.move_messages([msg_uid], - aggregate_reports_folder) - except Exception as e: - message = "Error moving message UID" - e = "{0} {1}: {2}".format(message, msg_uid, e) - logger.error("IMAP error: {0}".format(e)) - if len(forensic_report_msg_uids) > 0: - message = "Moving forensic report messages from" - logger.debug( - "{0} {1} to {2}".format(message, - reports_folder, - forensic_reports_folder)) - number_of_forensic_msgs = len(forensic_report_msg_uids) - for i in range(number_of_forensic_msgs): - msg_uid = forensic_report_msg_uids[i] - message = "Moving message" - logger.debug("{0} {1} of {2}: UID {2}".format( - message, - i + 1, number_of_forensic_msgs, msg_uid)) - try: - server.move_messages([msg_uid], - forensic_reports_folder) - except Exception as e: - e = "Error moving message UID {0}: {1}".format( - msg_uid, e) - logger.error("IMAP error: {0}".format(e)) - results = OrderedDict([("aggregate_reports", aggregate_reports), - ("forensic_reports", forensic_reports)]) + if len(forensic_report_msg_uids) > 0: + message = "Moving forensic report messages from" + logger.debug( + "{0} {1} to {2}".format(message, + reports_folder, + forensic_reports_folder)) + number_of_forensic_msgs = len(forensic_report_msg_uids) + for i in range(number_of_forensic_msgs): + msg_uid = forensic_report_msg_uids[i] + message = "Moving message" + logger.debug("{0} {1} of {2}: UID {2}".format( + message, + i + 1, number_of_forensic_msgs, msg_uid)) + try: + server.move_messages([msg_uid], + forensic_reports_folder) + except Exception as e: + e = "Error moving message UID {0}: {1}".format( + msg_uid, e) + logger.error("IMAP error: {0}".format(e)) + results = OrderedDict([("aggregate_reports", aggregate_reports), + ("forensic_reports", forensic_reports)]) total_messages = len(server.search()) @@ -1024,8 +1037,8 @@ def get_dmarc_reports_from_inbox(connection=None, def watch_inbox(host, username, password, callback, port=None, ssl=True, verify=True, reports_folder="INBOX", - archive_folder="Archive", delete=False, test=False, wait=30, - nameservers=None, dns_timeout=6.0, + archive_folder="Archive", delete=False, test=False, + idle_timeout=30, nameservers=None, dns_timeout=6.0, strip_attachment_payloads=False): """ Use an IDLE IMAP connection to parse incoming emails, and pass the results @@ -1042,7 +1055,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, archive_folder: The folder to move processed mail to delete (bool): Delete messages after processing them test (bool): Do not move or delete messages after processing them - wait (int): Number of seconds to wait for a IMAP IDLE response + idle_timeout (int): Number of seconds to wait for a IMAP IDLE response nameservers (list): A list of one or more nameservers to use (Cloudflare's public DNS resolvers by default) dns_timeout (float): Set the DNS query timeout @@ -1054,19 +1067,23 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, ns = nameservers dt = dns_timeout sa = strip_attachment_payloads - server = IMAPClient(host=host, username=username, password=password, - port=port, ssl=ssl, verify=verify, - initial_folder=reports_folder, - idle_callback=get_dmarc_reports_from_inbox) - res = get_dmarc_reports_from_inbox(connection=server, - reports_folder=rf, - archive_folder=af, - delete=delete, - test=test, - nameservers=ns, - dns_timeout=dt, - strip_attachment_payloads=sa) - callback(res) + + def idle_callback(connection): + res = get_dmarc_reports_from_inbox(connection=connection, + reports_folder=rf, + archive_folder=af, + delete=delete, + test=test, + nameservers=ns, + dns_timeout=dt, + strip_attachment_payloads=sa) + callback(res) + + IMAPClient(host=host, username=username, password=password, + port=port, ssl=ssl, verify=verify, + initial_folder=rf, + idle_callback=idle_callback, + idle_timeout=idle_timeout) def save_output(results, output_directory="output"): diff --git a/setup.py b/setup.py index 3a1bd1b..19b9f79 100644 --- a/setup.py +++ b/setup.py @@ -97,6 +97,7 @@ setup( 'urllib3<1.25,>=1.21.1', 'requests>=2.2.16.0', 'imapclient>=2.1.0', 'mail-parser>=3.9.2', 'dateparser>=0.7.1', + 'mailsuite>=1.0.0', 'elasticsearch>=6.3.1,<7.0.0', 'elasticsearch-dsl>=6.3.1,<7.0.0', 'kafka-python>=1.4.4',