Add option to process messages in batches

This commit is contained in:
Tom Henderson
2021-02-05 13:37:09 +13:00
parent b43a622f9e
commit bc684c8913
4 changed files with 31 additions and 10 deletions
+1
View File
@@ -155,6 +155,7 @@ The full set of configuration options are:
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
- ``test`` - bool: Do not move or delete messages
- ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set.
- ``elasticsearch``
- ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``)
+1
View File
@@ -161,6 +161,7 @@ The full set of configuration options are:
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
- ``test`` - bool: Do not move or delete messages
- ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set.
- ``elasticsearch``
- ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``)
+18 -7
View File
@@ -1004,7 +1004,8 @@ def get_dmarc_reports_from_inbox(connection=None,
nameservers=None,
dns_timeout=6.0,
strip_attachment_payloads=False,
results=None):
results=None,
batch_size=None):
"""
Fetches and parses DMARC reports from an inbox
@@ -1028,6 +1029,7 @@ def get_dmarc_reports_from_inbox(connection=None,
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
@@ -1069,11 +1071,18 @@ def get_dmarc_reports_from_inbox(connection=None,
total_messages = len(messages)
logger.debug("Found {0} messages in {1}".format(len(messages),
reports_folder))
for i in range(len(messages)):
if batch_size:
message_limit = batch_size
else:
message_limit = total_messages
logger.debug("Processing {0} messages".format(message_limit))
for i in range(message_limit):
msg_uid = messages[i]
logger.debug("Processing message {0} of {1}: UID {2}".format(
i+1, total_messages, msg_uid
i+1, message_limit, msg_uid
))
msg_content = server.fetch_message(msg_uid, parse=False)
sa = strip_attachment_payloads
@@ -1165,7 +1174,7 @@ def get_dmarc_reports_from_inbox(connection=None,
total_messages = len(server.search())
if not test and total_messages > 0:
if not test and not batch_size and total_messages > 0:
# Process emails that came in during the last run
results = get_dmarc_reports_from_inbox(
connection=server,
@@ -1187,7 +1196,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
verify=True, reports_folder="INBOX",
archive_folder="Archive", delete=False, test=False,
idle_timeout=30, offline=False, nameservers=None,
dns_timeout=6.0, strip_attachment_payloads=False):
dns_timeout=6.0, strip_attachment_payloads=False, batch_size=None):
"""
Use an IDLE IMAP connection to parse incoming emails, and pass the results
to a callback function
@@ -1210,6 +1219,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Replace attachment payloads in
forensic report samples with None
batch_size (int): Number of messages to read and process before saving
"""
sa = strip_attachment_payloads
@@ -1222,7 +1232,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
offline=offline,
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa)
strip_attachment_payloads=sa,
batch_size=batch_size)
callback(res)
while True:
+11 -3
View File
@@ -210,6 +210,7 @@ def _main():
imap_watch=False,
imap_delete=False,
imap_test=False,
imap_batch_size=None,
hec=None,
hec_token=None,
hec_index=None,
@@ -327,6 +328,10 @@ def _main():
opts.imap_delete = imap_config.getboolean("delete")
if "test" in imap_config:
opts.imap_test = imap_config.getboolean("test")
if "batch_size" in imap_config:
opts.imap_batch_size = imap_config.getint("batch_size")
else:
opts.imap_batch_size = None
if "elasticsearch" in config:
elasticsearch_config = config["elasticsearch"]
if "hosts" in elasticsearch_config:
@@ -613,8 +618,9 @@ def _main():
offline=opts.offline,
nameservers=ns,
test=opts.imap_test,
strip_attachment_payloads=sa
)
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
@@ -670,7 +676,9 @@ def _main():
test=opts.imap_test,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=sa)
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
exit(1)