Compare commits

..

12 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
3509e5c553 Fix resource leak: validate HEC settings before creating any output clients
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/38c73e09-789d-4d41-b75e-bbc61418859d
2026-03-20 22:52:49 +00:00
copilot-swe-agent[bot]
964a1baeed Initial plan 2026-03-20 22:49:31 +00:00
Sean Whalen
7688d00226 Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 18:49:09 -04:00
Copilot
8796c7e3bd Fix SIGHUP reload tight-loop in watch mode (#702)
* Initial plan

* Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 17:31:45 -04:00
Copilot
a05eb0807a Best-effort initialization for optional output clients in watch mode (#701)
* Initial plan

* Wrap optional output client init in try/except for best-effort initialization

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 17:18:08 -04:00
Sean Whalen
6fceb3e2ce Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 17:07:20 -04:00
Copilot
510d5d05a9 SIGHUP-based configuration reload: address review feedback (#700)
* Initial plan

* Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 16:57:00 -04:00
Copilot
3445438684 [WIP] SIGHUP-based configuration reload for watch mode (#699)
* Initial plan

* Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 15:54:40 -04:00
Copilot
17defb75b0 [WIP] SIGHUP-based configuration reload for watch mode (#698)
* Initial plan

* Fix reload state consistency, resource leaks, stale opts; add tests

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-20 15:40:44 -04:00
Sean Whalen
893d0a4f03 Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 15:28:38 -04:00
Sean Whalen
58e07140a8 Update parsedmarc/cli.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-20 15:27:22 -04:00
Sean Whalen
dfdffe4947 Enhance mailbox connection watch method to support reload functionality
- Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so.
- Modified related tests to accommodate the new method signature.
- Changed logger calls from `critical` to `error` for consistency in logging severity.
- Added a new settings file for Claude with specific permissions for testing and code checks.
2026-03-20 15:00:21 -04:00
15 changed files with 207 additions and 256 deletions

View File

@@ -7,8 +7,7 @@
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
"Bash(ls tests*)",
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
"Bash(python -m pytest tests.py --no-header -q)"
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
],
"additionalDirectories": [
"/tmp"

View File

@@ -4,25 +4,20 @@
### Added
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
- Use `systemctl reload parsedmarc` when running under `systemd`.
- On a successful reload, old output clients are closed and recreated.
- On a failed reload, the previous configuration remains fully active.
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
### Fixed
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
- SIGHUP-based configuration reload for watch mode — update output
destinations, DNS/GeoIP settings, processing flags, and log level
without restarting the service or interrupting in-progress report
processing. Use `systemctl reload parsedmarc` when running under
systemd.
- Extracted `_parse_config_file()` and `_init_output_clients()` from
`_main()` in `cli.py` to support config reload and reduce code
duplication.
## 9.2.1
### Added
- Better checking of `msgraph` configuration (PR #695)
- Better checking of `msconfig` configuration (PR #695)
### Changed

View File

@@ -666,15 +666,8 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
Gmail API, Maildir path) are **not** reloaded — changing those still
requires a full restart.
On a **successful** reload, existing output client connections are
closed and new ones are created from the updated configuration. The
service then resumes watching with the new settings.
If the new configuration file contains errors (missing required
settings, unreachable output destinations, etc.), the **entire reload
is aborted** — no output clients are replaced and the previous
configuration remains fully active. This means a typo in one section
will not take down an otherwise working setup. Check the logs for
If the new configuration file contains errors, the reload is aborted
and the previous configuration remains active. Check the logs for
details:
```bash

View File

@@ -2195,7 +2195,7 @@ def watch_inbox(
batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24,
config_reloading: Optional[Callable] = None,
should_reload: Optional[Callable] = None,
):
"""
Watches the mailbox for new messages and
@@ -2223,7 +2223,7 @@ def watch_inbox(
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
config_reloading: Optional callable that returns True when a config
should_reload: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
"""
@@ -2253,8 +2253,8 @@ def watch_inbox(
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if config_reloading is not None:
watch_kwargs["config_reloading"] = config_reloading
if should_reload is not None:
watch_kwargs["should_reload"] = should_reload
mailbox_connection.watch(**watch_kwargs)

View File

@@ -201,8 +201,21 @@ def _parse_config_file(config_file, opts):
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
map_path = general_config["index_prefix_domain_map"]
try:
with open(map_path) as f:
index_prefix_domain_map = yaml.safe_load(f)
except OSError as exc:
raise ConfigurationError(
"Failed to read index_prefix_domain_map file '{0}': {1}".format(
map_path, exc
)
) from exc
except yaml.YAMLError as exc:
raise ConfigurationError(
"Failed to parse YAML in index_prefix_domain_map "
"file '{0}': {1}".format(map_path, exc)
) from exc
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
if "strip_attachment_payloads" in general_config:
@@ -747,15 +760,15 @@ def _parse_config_file(config_file, opts):
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config[
opts.gmail_api_service_account_user = gmail_api_config.get(
"service_account_user"
].strip()
).strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config[
opts.gmail_api_service_account_user = gmail_api_config.get(
"delegated_user"
].strip()
).strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
@@ -810,38 +823,6 @@ def _parse_config_file(config_file, opts):
return index_prefix_domain_map
class _ElasticsearchHandle:
"""Sentinel so Elasticsearch participates in _close_output_clients."""
def close(self):
try:
conn = elastic.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
elastic.connections.remove_connection("default")
except Exception:
pass
class _OpenSearchHandle:
"""Sentinel so OpenSearch participates in _close_output_clients."""
def close(self):
try:
conn = opensearch.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
opensearch.connections.remove_connection("default")
except Exception:
pass
def _init_output_clients(opts):
"""Create output clients based on current opts.
@@ -851,85 +832,15 @@ def _init_output_clients(opts):
Raises:
ConfigurationError: If a required output client cannot be created.
"""
# Validate all required settings before creating any clients so that a
# ConfigurationError does not leave partially-created clients un-closed.
if opts.hec and (opts.hec_token is None or opts.hec_index is None):
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
clients = {}
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
if opts.syslog_server:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
if opts.gelf_host:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
# Elasticsearch and OpenSearch mutate module-level global state via
# connections.create_connection(), which cannot be rolled back if a later
# step fails. Initialise them last so that all other clients are created
# successfully first; this minimises the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
@@ -963,7 +874,6 @@ def _init_output_clients(opts):
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
@@ -1000,7 +910,100 @@ def _init_output_clients(opts):
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
clients["opensearch"] = _OpenSearchHandle()
if opts.s3_bucket:
try:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
except Exception:
logger.warning("Failed to initialize S3 client; skipping", exc_info=True)
if opts.syslog_server:
try:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
except Exception:
logger.warning(
"Failed to initialize syslog client; skipping", exc_info=True
)
if opts.hec:
verify = True
if opts.hec_skip_certificate_verification:
verify = False
try:
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
except Exception:
logger.warning(
"Failed to initialize Splunk HEC client; skipping", exc_info=True
)
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_ssl:
ssl_context = create_default_context()
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
try:
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl=opts.kafka_ssl,
ssl_context=ssl_context,
)
except Exception:
logger.warning("Failed to initialize Kafka client; skipping", exc_info=True)
if opts.gelf_host:
try:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception:
logger.warning("Failed to initialize GELF client; skipping", exc_info=True)
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
try:
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
except Exception:
logger.warning(
"Failed to initialize webhook client; skipping", exc_info=True
)
return clients
@@ -1032,7 +1035,7 @@ def _main():
if "policy_published" in report:
domain = report["policy_published"]["domain"]
elif "reported_domain" in report:
domain = report["reported_domain"]
domain = report("reported_domain")
elif "policies" in report:
domain = report["policies"][0]["domain"]
if domain:
@@ -1627,7 +1630,7 @@ def _main():
try:
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
except ConfigurationError as e:
logger.critical(str(e))
logger.error(str(e))
exit(-1)
logger.setLevel(logging.ERROR)
@@ -1649,8 +1652,6 @@ def _main():
except Exception as error:
logger.warning("Unable to write to log file: {}".format(error))
opts.active_log_file = opts.log_file
if (
opts.imap_host is None
and opts.graph_client_id is None
@@ -1666,14 +1667,14 @@ def _main():
# Initialize output clients
try:
clients = _init_output_clients(opts)
except elastic.ElasticsearchError as e:
logger.exception("Elasticsearch Error: {0}".format(e))
except elastic.ElasticsearchError:
logger.exception("Elasticsearch Error")
exit(1)
except opensearch.OpenSearchError as e:
logger.exception("OpenSearch Error: {0}".format(e))
except opensearch.OpenSearchError:
logger.exception("OpenSearch Error")
exit(1)
except ConfigurationError as e:
logger.critical(str(e))
logger.error(str(e))
exit(1)
except Exception as error_:
logger.error("Output client error: {0}".format(error_))
@@ -1809,9 +1810,8 @@ def _main():
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error(
"IMAP user and password must be specified if host is specified"
"IMAP user and password must be specified ifhost is specified"
)
exit(1)
ssl = True
verify = True
@@ -1988,9 +1988,8 @@ def _main():
def _handle_sighup(signum, frame):
nonlocal _reload_requested
# Logging is not async-signal-safe; only set the flag here.
# The log message is emitted from the main loop when the flag is read.
_reload_requested = True
logger.info("SIGHUP received, config will reload after current batch")
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
@@ -2025,7 +2024,7 @@ def _main():
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
config_reloading=lambda: _reload_requested,
should_reload=lambda: _reload_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
@@ -2037,11 +2036,9 @@ def _main():
if not _reload_requested:
break
# Reload configuration — emit the log message here (not in the
# signal handler, which is not async-signal-safe), then clear the
# flag so that any new SIGHUP arriving while we reload will be
# captured for the next iteration rather than being silently dropped.
logger.info("SIGHUP received, config will reload after current batch")
# Reload configuration — clear the flag first so that any new
# SIGHUP arriving while we reload will be captured for the next
# iteration rather than being silently dropped.
_reload_requested = False
logger.info("Reloading configuration...")
try:

View File

@@ -1,3 +1,3 @@
__version__ = "9.3.0"
__version__ = "9.2.1"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -3,7 +3,9 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
@@ -12,7 +14,6 @@ from parsedmarc import (
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
log_context_data = threading.local()
@@ -36,7 +37,7 @@ class GelfClient(object):
"""
self.host = host
self.port = port
self.logger = logging.getLogger("parsedmarc_gelf")
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
@@ -49,7 +50,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -57,13 +58,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row

View File

@@ -175,13 +175,13 @@ class GmailConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
if should_reload and should_reload():
return
sleep(check_timeout)
if config_reloading and config_reloading():
if should_reload and should_reload():
return
check_callback(self)

View File

@@ -278,14 +278,12 @@ class MSGraphConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
if should_reload and should_reload():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)

View File

@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
@@ -94,8 +94,6 @@ class IMAPConnection(MailboxConnection):
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
@@ -113,5 +111,5 @@ class IMAPConnection(MailboxConnection):
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if config_reloading and config_reloading():
if should_reload and should_reload():
return

View File

@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
raise NotImplementedError

View File

@@ -63,14 +63,12 @@ class MaildirConnection(MailboxConnection):
def keepalive(self):
return
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout, should_reload=None):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if config_reloading and config_reloading():
if should_reload and should_reload():
return
sleep(check_timeout)

View File

@@ -93,11 +93,3 @@ class S3Client(object):
self.bucket.put_object(
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
)
def close(self):
"""Clean up the boto3 resource."""
try:
if self.s3.meta is not None:
self.s3.meta.client.close()
except Exception:
pass

View File

@@ -207,7 +207,3 @@ class HECClient(object):
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def close(self):
"""Close the underlying HTTP session."""
self.session.close()

View File

@@ -12,11 +12,10 @@ from base64 import urlsafe_b64encode
from glob import glob
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import cast
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from lxml import etree # type: ignore[import-untyped]
from lxml import etree
from googleapiclient.errors import HttpError
from httplib2 import Response
from imapclient.exceptions import IMAPClientError
@@ -33,7 +32,6 @@ from parsedmarc.mail.imap import IMAPConnection
import parsedmarc.mail.gmail as gmail_module
import parsedmarc.mail.graph as graph_module
import parsedmarc.mail.imap as imap_module
import parsedmarc.elastic
import parsedmarc.opensearch as opensearch_module
import parsedmarc.utils
@@ -156,7 +154,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
assert result["report_type"] == "aggregate"
self.assertEqual(result["report_type"], "aggregate")
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
def testParseReportFileAcceptsPathForEmail(self):
@@ -167,7 +165,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
assert result["report_type"] == "aggregate"
self.assertEqual(result["report_type"], "aggregate")
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
def testAggregateSamples(self):
@@ -178,11 +176,10 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
result = parsedmarc.parse_report_file(
parsed_report = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
)
assert result["report_type"] == "aggregate"
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
)["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
print("Passed!")
def testEmptySample(self):
@@ -198,13 +195,13 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file:
sample_content = sample_file.read()
email_result = parsedmarc.parse_report_email(
parsed_report = parsedmarc.parse_report_email(
sample_content, offline=OFFLINE_MODE
)
assert email_result["report_type"] == "forensic"
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "forensic"
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
print("Passed!")
def testSmtpTlsSamples(self):
@@ -215,9 +212,10 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "smtp_tls"
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
print("Passed!")
def testOpenSearchSigV4RequiresRegion(self):
@@ -1280,7 +1278,7 @@ class TestMailboxWatchSince(unittest.TestCase):
def testWatchInboxPassesSinceToMailboxFetch(self):
mailbox_connection = SimpleNamespace()
def fake_watch(check_callback, check_timeout, config_reloading=None):
def fake_watch(check_callback, check_timeout, should_reload=None):
check_callback(mailbox_connection)
raise _BreakLoop()
@@ -1291,9 +1289,7 @@ class TestMailboxWatchSince(unittest.TestCase):
) as mocked:
with self.assertRaises(_BreakLoop):
parsedmarc.watch_inbox(
mailbox_connection=cast(
parsedmarc.MailboxConnection, mailbox_connection
),
mailbox_connection=mailbox_connection,
callback=callback,
check_timeout=1,
batch_size=10,
@@ -1341,30 +1337,30 @@ since = 2d
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
class _DummyMailboxConnection:
def __init__(self):
self.fetch_calls: list[dict[str, object]] = []
self.fetch_calls = []
def create_folder(self, folder_name: str):
def create_folder(self, folder_name):
return None
def fetch_messages(self, reports_folder: str, **kwargs):
def fetch_messages(self, reports_folder, **kwargs):
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
return []
def fetch_message(self, message_id) -> str:
def fetch_message(self, message_id, **kwargs):
return ""
def delete_message(self, message_id):
return None
def move_message(self, message_id, folder_name: str):
def move_message(self, message_id, folder_name):
return None
def keepalive(self):
return None
def watch(self, check_callback, check_timeout, config_reloading=None):
def watch(self, check_callback, check_timeout):
return None
@@ -1450,7 +1446,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"certificate_path setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1522,7 +1518,7 @@ user = owner@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"password setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1563,7 +1559,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testWellKnownFolderFallback(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._client = _FakeGraphClient()
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1583,7 +1579,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testUnknownFolderStillFails(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._client = _FakeGraphClient()
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1679,7 +1675,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"client_secret setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1711,7 +1707,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1743,7 +1739,7 @@ tenant_id = tenant-id
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1813,7 +1809,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1844,7 +1840,7 @@ tenant_id = tenant-id
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1876,7 +1872,7 @@ certificate_path = /tmp/msgraph-cert.pem
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1908,7 +1904,7 @@ certificate_path = /tmp/msgraph-cert.pem
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.critical.assert_called_once_with(
mock_logger.error.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1930,10 +1926,7 @@ password = pass
watch = true
"""
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -1999,10 +1992,7 @@ watch = true
# _parse_config_file called for initial load + reload
self.assertGreaterEqual(mock_parse_config.call_count, 2)
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -2075,10 +2065,7 @@ watch = true
# The failed reload must not have closed the original clients
initial_clients["s3_client"].close.assert_not_called()
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -2150,10 +2137,7 @@ watch = true
# Old client must have been closed when reload succeeded
old_client.close.assert_called_once()
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")