From bc684c891340abf39b91b5a2a758f6ace623d2f9 Mon Sep 17 00:00:00 2001 From: Tom Henderson Date: Fri, 5 Feb 2021 13:37:09 +1300 Subject: [PATCH 1/3] Add option to process messages in batches --- README.rst | 1 + docs/index.rst | 1 + parsedmarc/__init__.py | 25 ++++++++++++++++++------- parsedmarc/cli.py | 14 +++++++++++--- 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/README.rst b/README.rst index b028445..ce3f2ea 100644 --- a/README.rst +++ b/README.rst @@ -155,6 +155,7 @@ The full set of configuration options are: - ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive - ``delete`` - bool: Delete messages after processing them, instead of archiving them - ``test`` - bool: Do not move or delete messages + - ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set. - ``elasticsearch`` - ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``) diff --git a/docs/index.rst b/docs/index.rst index 449f048..7897a30 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -161,6 +161,7 @@ The full set of configuration options are: - ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive - ``delete`` - bool: Delete messages after processing them, instead of archiving them - ``test`` - bool: Do not move or delete messages + - ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set. - ``elasticsearch`` - ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index ca4f1e7..8380df7 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -1004,7 +1004,8 @@ def get_dmarc_reports_from_inbox(connection=None, nameservers=None, dns_timeout=6.0, strip_attachment_payloads=False, - results=None): + results=None, + batch_size=None): """ Fetches and parses DMARC reports from an inbox @@ -1028,6 +1029,7 @@ def get_dmarc_reports_from_inbox(connection=None, strip_attachment_payloads (bool): Remove attachment payloads from forensic report results results (dict): Results from the previous run + batch_size (int): Number of messages to read and process before saving Returns: OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports`` @@ -1069,11 +1071,18 @@ def get_dmarc_reports_from_inbox(connection=None, total_messages = len(messages) logger.debug("Found {0} messages in {1}".format(len(messages), reports_folder)) - for i in range(len(messages)): + + if batch_size: + message_limit = batch_size + else: + message_limit = total_messages + + logger.debug("Processing {0} messages".format(message_limit)) + + for i in range(message_limit): msg_uid = messages[i] logger.debug("Processing message {0} of {1}: UID {2}".format( - i+1, total_messages, msg_uid - + i+1, message_limit, msg_uid )) msg_content = server.fetch_message(msg_uid, parse=False) sa = strip_attachment_payloads @@ -1165,7 +1174,7 @@ def get_dmarc_reports_from_inbox(connection=None, total_messages = len(server.search()) - if not test and total_messages > 0: + if not test and not batch_size and total_messages > 0: # Process emails that came in during the last run results = get_dmarc_reports_from_inbox( connection=server, @@ -1187,7 +1196,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, verify=True, reports_folder="INBOX", archive_folder="Archive", delete=False, test=False, idle_timeout=30, offline=False, nameservers=None, - dns_timeout=6.0, strip_attachment_payloads=False): + dns_timeout=6.0, strip_attachment_payloads=False, batch_size=None): """ Use an IDLE IMAP connection to parse incoming emails, and pass the results to a callback function @@ -1210,6 +1219,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, dns_timeout (float): Set the DNS query timeout strip_attachment_payloads (bool): Replace attachment payloads in forensic report samples with None + batch_size (int): Number of messages to read and process before saving """ sa = strip_attachment_payloads @@ -1222,7 +1232,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, offline=offline, nameservers=nameservers, dns_timeout=dns_timeout, - strip_attachment_payloads=sa) + strip_attachment_payloads=sa, + batch_size=batch_size) callback(res) while True: diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index e9119e0..df1a870 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -210,6 +210,7 @@ def _main(): imap_watch=False, imap_delete=False, imap_test=False, + imap_batch_size=None, hec=None, hec_token=None, hec_index=None, @@ -327,6 +328,10 @@ def _main(): opts.imap_delete = imap_config.getboolean("delete") if "test" in imap_config: opts.imap_test = imap_config.getboolean("test") + if "batch_size" in imap_config: + opts.imap_batch_size = imap_config.getint("batch_size") + else: + opts.imap_batch_size = None if "elasticsearch" in config: elasticsearch_config = config["elasticsearch"] if "hosts" in elasticsearch_config: @@ -613,8 +618,9 @@ def _main(): offline=opts.offline, nameservers=ns, test=opts.imap_test, - strip_attachment_payloads=sa - ) + strip_attachment_payloads=sa, + batch_size=opts.imap_batch_size + ) aggregate_reports += reports["aggregate_reports"] forensic_reports += reports["forensic_reports"] @@ -670,7 +676,9 @@ def _main(): test=opts.imap_test, nameservers=opts.nameservers, dns_timeout=opts.dns_timeout, - strip_attachment_payloads=sa) + strip_attachment_payloads=sa, + batch_size=opts.imap_batch_size + ) except FileExistsError as error: logger.error("{0}".format(error.__str__())) exit(1) From 9522c9b6e4400a3758a74d143f1b6d7a69d0a12b Mon Sep 17 00:00:00 2001 From: Tom Henderson Date: Fri, 5 Feb 2021 14:51:32 +1300 Subject: [PATCH 2/3] Ensure message_limit is not greater than total_messages --- parsedmarc/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 8380df7..78bd356 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -1073,7 +1073,7 @@ def get_dmarc_reports_from_inbox(connection=None, reports_folder)) if batch_size: - message_limit = batch_size + message_limit = min(total_messages, batch_size) else: message_limit = total_messages From de05be90df8ef7fc1233be6803c5991da7c9345c Mon Sep 17 00:00:00 2001 From: Tom Henderson Date: Fri, 5 Feb 2021 14:53:43 +1300 Subject: [PATCH 3/3] Fix flake8 error --- parsedmarc/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 78bd356..2f192a5 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -1196,7 +1196,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, verify=True, reports_folder="INBOX", archive_folder="Archive", delete=False, test=False, idle_timeout=30, offline=False, nameservers=None, - dns_timeout=6.0, strip_attachment_payloads=False, batch_size=None): + dns_timeout=6.0, strip_attachment_payloads=False, + batch_size=None): """ Use an IDLE IMAP connection to parse incoming emails, and pass the results to a callback function