Compare commits

..

2 Commits

Author SHA1 Message Date
Sean Whalen
9849598100 Formatting 2026-03-21 16:17:35 -04:00
Sean Whalen
e82f3e58a1 SIGHUP-based configuration reload for watch mode (#697)
* Enhance mailbox connection watch method to support reload functionality

- Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so.
- Modified related tests to accommodate the new method signature.
- Changed logger calls from `critical` to `error` for consistency in logging severity.
- Added a new settings file for Claude with specific permissions for testing and code checks.

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#698)

* Initial plan

* Fix reload state consistency, resource leaks, stale opts; add tests

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#699)

* Initial plan

* Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based configuration reload: address review feedback (#700)

* Initial plan

* Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Best-effort initialization for optional output clients in watch mode (#701)

* Initial plan

* Wrap optional output client init in try/except for best-effort initialization

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Fix SIGHUP reload tight-loop in watch mode (#702)

* Initial plan

* Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fix resource leak when HEC config is invalid in `_init_output_clients()` (#703)

* Initial plan

* Fix resource leak: validate HEC settings before creating any output clients

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/38c73e09-789d-4d41-b75e-bbc61418859d

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Ensure SIGHUP never triggers a new email batch across all watch() implementations (#704)

* Initial plan

* Ensure SIGHUP never starts a new email batch in any watch() implementation

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/45d5be30-8f6b-4200-9bdd-15c655033f17

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based config reload for watch mode: address review feedback (#705)

* Initial plan

* Address review feedback: Kafka SSL context, SIGHUP handler safety, test formatting

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/8f2fd48f-32a4-4258-9a89-06f7c7ac29bf

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Reverted changes by copilot that turned errors into warnings

* Enhance usage documentation for config reload: clarify behavior on successful reload and error handling

* Update CHANGELOG.md to reflect config reload enhancements

* Add pytest command to settings for silent output during testing

* Enhance resource management: add close methods for S3Client and HECClient, and improve IMAP connection handling during IDLE. Update CHANGELOG.md for config reload improvements and bug fixes.

* Update changelog to not include fixes within the same unreleased version

* Refactor changelog entries for clarity and consistency in configuration reload section

* Fix changelog entry for msgraph configuration check

* Update CHANGELOG..md

* make single list items on one line in the changelog instead of doing hard wraps

* Remove incorrect IMAP changes

* Rename 'should_reload' parameter to 'config_reloading' in mailbox connection methods for clarity

* Restore startup configuration checks

* Improve error logging for Elasticsearch and OpenSearch exceptions

* Bump version to 9.3.0 in constants.py

* Refactor GelfClient methods to use specific report types instead of generic dicts

* Refactor tests to use assertions consistently and improve type hints

---------

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2026-03-21 16:14:48 -04:00
15 changed files with 262 additions and 204 deletions

View File

@@ -7,7 +7,8 @@
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
"Bash(ls tests*)",
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
"Bash(python -m pytest tests.py --no-header -q)"
],
"additionalDirectories": [
"/tmp"

View File

@@ -4,20 +4,25 @@
### Added
- SIGHUP-based configuration reload for watch mode — update output
destinations, DNS/GeoIP settings, processing flags, and log level
without restarting the service or interrupting in-progress report
processing. Use `systemctl reload parsedmarc` when running under
systemd.
- Extracted `_parse_config_file()` and `_init_output_clients()` from
`_main()` in `cli.py` to support config reload and reduce code
duplication.
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
- Use `systemctl reload parsedmarc` when running under `systemd`.
- On a successful reload, old output clients are closed and recreated.
- On a failed reload, the previous configuration remains fully active.
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
### Fixed
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
## 9.2.1
### Added
- Better checking of `msconfig` configuration (PR #695)
- Better checking of `msgraph` configuration (PR #695)
### Changed

View File

@@ -666,8 +666,15 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
Gmail API, Maildir path) are **not** reloaded — changing those still
requires a full restart.
If the new configuration file contains errors, the reload is aborted
and the previous configuration remains active. Check the logs for
On a **successful** reload, existing output client connections are
closed and new ones are created from the updated configuration. The
service then resumes watching with the new settings.
If the new configuration file contains errors (missing required
settings, unreachable output destinations, etc.), the **entire reload
is aborted** — no output clients are replaced and the previous
configuration remains fully active. This means a typo in one section
will not take down an otherwise working setup. Check the logs for
details:
```bash

View File

@@ -2195,7 +2195,7 @@ def watch_inbox(
batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24,
should_reload: Optional[Callable] = None,
config_reloading: Optional[Callable] = None,
):
"""
Watches the mailbox for new messages and
@@ -2223,7 +2223,7 @@ def watch_inbox(
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
should_reload: Optional callable that returns True when a config
config_reloading: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
"""
@@ -2253,8 +2253,8 @@ def watch_inbox(
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if should_reload is not None:
watch_kwargs["should_reload"] = should_reload
if config_reloading is not None:
watch_kwargs["config_reloading"] = config_reloading
mailbox_connection.watch(**watch_kwargs)

View File

@@ -201,21 +201,8 @@ def _parse_config_file(config_file, opts):
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
map_path = general_config["index_prefix_domain_map"]
try:
with open(map_path) as f:
index_prefix_domain_map = yaml.safe_load(f)
except OSError as exc:
raise ConfigurationError(
"Failed to read index_prefix_domain_map file '{0}': {1}".format(
map_path, exc
)
) from exc
except yaml.YAMLError as exc:
raise ConfigurationError(
"Failed to parse YAML in index_prefix_domain_map "
"file '{0}': {1}".format(map_path, exc)
) from exc
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
if "strip_attachment_payloads" in general_config:
@@ -760,15 +747,15 @@ def _parse_config_file(config_file, opts):
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
opts.gmail_api_service_account_user = gmail_api_config[
"service_account_user"
).strip()
].strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
opts.gmail_api_service_account_user = gmail_api_config[
"delegated_user"
).strip()
].strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
@@ -823,6 +810,38 @@ def _parse_config_file(config_file, opts):
return index_prefix_domain_map
class _ElasticsearchHandle:
"""Sentinel so Elasticsearch participates in _close_output_clients."""
def close(self):
try:
conn = elastic.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
elastic.connections.remove_connection("default")
except Exception:
pass
class _OpenSearchHandle:
"""Sentinel so OpenSearch participates in _close_output_clients."""
def close(self):
try:
conn = opensearch.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
opensearch.connections.remove_connection("default")
except Exception:
pass
def _init_output_clients(opts):
"""Create output clients based on current opts.
@@ -834,6 +853,83 @@ def _init_output_clients(opts):
"""
clients = {}
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
if opts.syslog_server:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
if opts.gelf_host:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
# Elasticsearch and OpenSearch mutate module-level global state via
# connections.create_connection(), which cannot be rolled back if a later
# step fails. Initialise them last so that all other clients are created
# successfully first; this minimises the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
@@ -867,6 +963,7 @@ def _init_output_clients(opts):
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
@@ -903,104 +1000,7 @@ def _init_output_clients(opts):
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
if opts.s3_bucket:
try:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
except Exception:
logger.warning("Failed to initialize S3 client; skipping", exc_info=True)
if opts.syslog_server:
try:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
except Exception:
logger.warning(
"Failed to initialize syslog client; skipping", exc_info=True
)
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
try:
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
except Exception:
logger.warning(
"Failed to initialize Splunk HEC client; skipping", exc_info=True
)
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_ssl:
ssl_context = create_default_context()
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
try:
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl=opts.kafka_ssl,
ssl_context=ssl_context,
)
except Exception:
logger.warning("Failed to initialize Kafka client; skipping", exc_info=True)
if opts.gelf_host:
try:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception:
logger.warning("Failed to initialize GELF client; skipping", exc_info=True)
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
try:
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
except Exception:
logger.warning(
"Failed to initialize webhook client; skipping", exc_info=True
)
clients["opensearch"] = _OpenSearchHandle()
return clients
@@ -1032,7 +1032,7 @@ def _main():
if "policy_published" in report:
domain = report["policy_published"]["domain"]
elif "reported_domain" in report:
domain = report("reported_domain")
domain = report["reported_domain"]
elif "policies" in report:
domain = report["policies"][0]["domain"]
if domain:
@@ -1627,7 +1627,7 @@ def _main():
try:
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
except ConfigurationError as e:
logger.error(str(e))
logger.critical(str(e))
exit(-1)
logger.setLevel(logging.ERROR)
@@ -1649,6 +1649,8 @@ def _main():
except Exception as error:
logger.warning("Unable to write to log file: {}".format(error))
opts.active_log_file = opts.log_file
if (
opts.imap_host is None
and opts.graph_client_id is None
@@ -1664,14 +1666,14 @@ def _main():
# Initialize output clients
try:
clients = _init_output_clients(opts)
except elastic.ElasticsearchError:
logger.exception("Elasticsearch Error")
except elastic.ElasticsearchError as e:
logger.exception("Elasticsearch Error: {0}".format(e))
exit(1)
except opensearch.OpenSearchError:
logger.exception("OpenSearch Error")
except opensearch.OpenSearchError as e:
logger.exception("OpenSearch Error: {0}".format(e))
exit(1)
except ConfigurationError as e:
logger.error(str(e))
logger.critical(str(e))
exit(1)
except Exception as error_:
logger.error("Output client error: {0}".format(error_))
@@ -1807,8 +1809,9 @@ def _main():
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error(
"IMAP user and password must be specified ifhost is specified"
"IMAP user and password must be specified if host is specified"
)
exit(1)
ssl = True
verify = True
@@ -1985,8 +1988,9 @@ def _main():
def _handle_sighup(signum, frame):
nonlocal _reload_requested
# Logging is not async-signal-safe; only set the flag here.
# The log message is emitted from the main loop when the flag is read.
_reload_requested = True
logger.info("SIGHUP received, config will reload after current batch")
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
@@ -1995,6 +1999,12 @@ def _main():
logger.info("Watching for email - Quit with ctrl-c")
while True:
# Re-check mailbox_watch in case a config reload disabled watch mode
if not opts.mailbox_watch:
logger.info(
"Mailbox watch disabled in reloaded configuration, stopping watcher"
)
break
try:
watch_inbox(
mailbox_connection=mailbox_connection,
@@ -2015,7 +2025,7 @@ def _main():
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
should_reload=lambda: _reload_requested,
config_reloading=lambda: _reload_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
@@ -2027,9 +2037,11 @@ def _main():
if not _reload_requested:
break
# Reload configuration — clear the flag first so that any new
# SIGHUP arriving while we reload will be captured for the next
# iteration rather than being silently dropped.
# Reload configuration — emit the log message here (not in the
# signal handler, which is not async-signal-safe), then clear the
# flag so that any new SIGHUP arriving while we reload will be
# captured for the next iteration rather than being silently dropped.
logger.info("SIGHUP received, config will reload after current batch")
_reload_requested = False
logger.info("Reloading configuration...")
try:

View File

@@ -1,3 +1,3 @@
__version__ = "9.2.1"
__version__ = "9.3.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -3,9 +3,7 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
@@ -14,6 +12,7 @@ from parsedmarc import (
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
log_context_data = threading.local()
@@ -37,7 +36,7 @@ class GelfClient(object):
"""
self.host = host
self.port = port
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger = logging.getLogger("parsedmarc_gelf")
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
@@ -50,7 +49,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -58,13 +57,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row

View File

@@ -175,13 +175,13 @@ class GmailConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)
if should_reload and should_reload():
if config_reloading and config_reloading():
return
check_callback(self)

View File

@@ -278,12 +278,14 @@ class MSGraphConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)

View File

@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
@@ -111,5 +113,5 @@ class IMAPConnection(MailboxConnection):
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if should_reload and should_reload():
if config_reloading and config_reloading():
return

View File

@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
raise NotImplementedError

View File

@@ -63,12 +63,14 @@ class MaildirConnection(MailboxConnection):
def keepalive(self):
return
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)

View File

@@ -93,3 +93,11 @@ class S3Client(object):
self.bucket.put_object(
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
)
def close(self):
"""Clean up the boto3 resource."""
try:
if self.s3.meta is not None:
self.s3.meta.client.close()
except Exception:
pass

View File

@@ -207,3 +207,7 @@ class HECClient(object):
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def close(self):
"""Close the underlying HTTP session."""
self.session.close()

View File

@@ -12,10 +12,11 @@ from base64 import urlsafe_b64encode
from glob import glob
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import cast
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from lxml import etree
from lxml import etree # type: ignore[import-untyped]
from googleapiclient.errors import HttpError
from httplib2 import Response
from imapclient.exceptions import IMAPClientError
@@ -32,6 +33,7 @@ from parsedmarc.mail.imap import IMAPConnection
import parsedmarc.mail.gmail as gmail_module
import parsedmarc.mail.graph as graph_module
import parsedmarc.mail.imap as imap_module
import parsedmarc.elastic
import parsedmarc.opensearch as opensearch_module
import parsedmarc.utils
@@ -154,7 +156,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
self.assertEqual(result["report_type"], "aggregate")
assert result["report_type"] == "aggregate"
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
def testParseReportFileAcceptsPathForEmail(self):
@@ -165,7 +167,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
self.assertEqual(result["report_type"], "aggregate")
assert result["report_type"] == "aggregate"
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
def testAggregateSamples(self):
@@ -176,10 +178,11 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
result = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
)
assert result["report_type"] == "aggregate"
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
print("Passed!")
def testEmptySample(self):
@@ -195,13 +198,13 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file:
sample_content = sample_file.read()
parsed_report = parsedmarc.parse_report_email(
email_result = parsedmarc.parse_report_email(
sample_content, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
)
assert email_result["report_type"] == "forensic"
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "forensic"
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
print("Passed!")
def testSmtpTlsSamples(self):
@@ -212,10 +215,9 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "smtp_tls"
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
print("Passed!")
def testOpenSearchSigV4RequiresRegion(self):
@@ -1278,7 +1280,7 @@ class TestMailboxWatchSince(unittest.TestCase):
def testWatchInboxPassesSinceToMailboxFetch(self):
mailbox_connection = SimpleNamespace()
def fake_watch(check_callback, check_timeout, should_reload=None):
def fake_watch(check_callback, check_timeout, config_reloading=None):
check_callback(mailbox_connection)
raise _BreakLoop()
@@ -1289,7 +1291,9 @@ class TestMailboxWatchSince(unittest.TestCase):
) as mocked:
with self.assertRaises(_BreakLoop):
parsedmarc.watch_inbox(
mailbox_connection=mailbox_connection,
mailbox_connection=cast(
parsedmarc.MailboxConnection, mailbox_connection
),
callback=callback,
check_timeout=1,
batch_size=10,
@@ -1337,30 +1341,30 @@ since = 2d
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
class _DummyMailboxConnection:
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
def __init__(self):
self.fetch_calls = []
self.fetch_calls: list[dict[str, object]] = []
def create_folder(self, folder_name):
def create_folder(self, folder_name: str):
return None
def fetch_messages(self, reports_folder, **kwargs):
def fetch_messages(self, reports_folder: str, **kwargs):
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
return []
def fetch_message(self, message_id, **kwargs):
def fetch_message(self, message_id) -> str:
return ""
def delete_message(self, message_id):
return None
def move_message(self, message_id, folder_name):
def move_message(self, message_id, folder_name: str):
return None
def keepalive(self):
return None
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
return None
@@ -1446,7 +1450,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"certificate_path setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1518,7 +1522,7 @@ user = owner@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"password setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1559,7 +1563,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testWellKnownFolderFallback(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient()
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1579,7 +1583,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testUnknownFolderStillFails(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient()
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1675,7 +1679,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"client_secret setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1707,7 +1711,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1739,7 +1743,7 @@ tenant_id = tenant-id
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1809,7 +1813,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1840,7 +1844,7 @@ tenant_id = tenant-id
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1872,7 +1876,7 @@ certificate_path = /tmp/msgraph-cert.pem
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1904,7 +1908,7 @@ certificate_path = /tmp/msgraph-cert.pem
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1926,7 +1930,10 @@ password = pass
watch = true
"""
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -1992,7 +1999,10 @@ watch = true
# _parse_config_file called for initial load + reload
self.assertGreaterEqual(mock_parse_config.call_count, 2)
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -2065,7 +2075,10 @@ watch = true
# The failed reload must not have closed the original clients
initial_clients["s3_client"].close.assert_not_called()
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -2137,7 +2150,10 @@ watch = true
# Old client must have been closed when reload succeeded
old_client.close.assert_called_once()
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")