Merge pull request #222 from tom-henderson/imap_batch_size

Add option to process messages in batches
This commit is contained in:
Sean Whalen
2021-06-19 11:41:26 -04:00
committed by GitHub
4 changed files with 32 additions and 10 deletions

View File

@@ -159,6 +159,7 @@ The full set of configuration options are:
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
- ``test`` - bool: Do not move or delete messages
- ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set.
- ``elasticsearch``
- ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``)

View File

@@ -165,6 +165,7 @@ The full set of configuration options are:
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
- ``test`` - bool: Do not move or delete messages
- ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set.
- ``elasticsearch``
- ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``)

View File

@@ -1022,7 +1022,8 @@ def get_dmarc_reports_from_inbox(connection=None,
nameservers=None,
dns_timeout=6.0,
strip_attachment_payloads=False,
results=None):
results=None,
batch_size=None):
"""
Fetches and parses DMARC reports from an inbox
@@ -1046,6 +1047,7 @@ def get_dmarc_reports_from_inbox(connection=None,
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
@@ -1088,11 +1090,18 @@ def get_dmarc_reports_from_inbox(connection=None,
total_messages = len(messages)
logger.debug("Found {0} messages in {1}".format(len(messages),
reports_folder))
for i in range(len(messages)):
msg_uid = messages[i]
logger.info("Processing message {0} of {1}: UID {2}".format(
i+1, total_messages, msg_uid
if batch_size:
message_limit = min(total_messages, batch_size)
else:
message_limit = total_messages
logger.debug("Processing {0} messages".format(message_limit))
for i in range(message_limit):
msg_uid = messages[i]
logger.debug("Processing message {0} of {1}: UID {2}".format(
i+1, message_limit, msg_uid
))
msg_content = server.fetch_message(msg_uid, parse=False)
sa = strip_attachment_payloads
@@ -1185,7 +1194,7 @@ def get_dmarc_reports_from_inbox(connection=None,
total_messages = len(server.search())
if not test and total_messages > 0:
if not test and not batch_size and total_messages > 0:
# Process emails that came in during the last run
results = get_dmarc_reports_from_inbox(
connection=server,
@@ -1207,7 +1216,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
verify=True, reports_folder="INBOX",
archive_folder="Archive", delete=False, test=False,
idle_timeout=30, offline=False, nameservers=None,
dns_timeout=6.0, strip_attachment_payloads=False):
dns_timeout=6.0, strip_attachment_payloads=False,
batch_size=None):
"""
Use an IDLE IMAP connection to parse incoming emails, and pass the results
to a callback function
@@ -1230,6 +1240,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Replace attachment payloads in
forensic report samples with None
batch_size (int): Number of messages to read and process before saving
"""
sa = strip_attachment_payloads
@@ -1242,7 +1253,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
offline=offline,
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa)
strip_attachment_payloads=sa,
batch_size=batch_size)
callback(res)
while True:

View File

@@ -245,6 +245,7 @@ def _main():
imap_watch=False,
imap_delete=False,
imap_test=False,
imap_batch_size=None,
hec=None,
hec_token=None,
hec_index=None,
@@ -364,6 +365,10 @@ def _main():
opts.imap_delete = imap_config.getboolean("delete")
if "test" in imap_config:
opts.imap_test = imap_config.getboolean("test")
if "batch_size" in imap_config:
opts.imap_batch_size = imap_config.getint("batch_size")
else:
opts.imap_batch_size = None
if "elasticsearch" in config:
elasticsearch_config = config["elasticsearch"]
if "hosts" in elasticsearch_config:
@@ -668,8 +673,9 @@ def _main():
offline=opts.offline,
nameservers=ns,
test=opts.imap_test,
strip_attachment_payloads=sa
)
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
@@ -729,6 +735,8 @@ def _main():
test=opts.imap_test,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size
offline=opts.offline,
strip_attachment_payloads=sa)
except FileExistsError as error: