Compare commits

..

2 Commits

Author SHA1 Message Date
Sean Whalen
9849598100 Formatting 2026-03-21 16:17:35 -04:00
Sean Whalen
e82f3e58a1 SIGHUP-based configuration reload for watch mode (#697)
* Enhance mailbox connection watch method to support reload functionality

- Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so.
- Modified related tests to accommodate the new method signature.
- Changed logger calls from `critical` to `error` for consistency in logging severity.
- Added a new settings file for Claude with specific permissions for testing and code checks.

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#698)

* Initial plan

* Fix reload state consistency, resource leaks, stale opts; add tests

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#699)

* Initial plan

* Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based configuration reload: address review feedback (#700)

* Initial plan

* Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Best-effort initialization for optional output clients in watch mode (#701)

* Initial plan

* Wrap optional output client init in try/except for best-effort initialization

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Fix SIGHUP reload tight-loop in watch mode (#702)

* Initial plan

* Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fix resource leak when HEC config is invalid in `_init_output_clients()` (#703)

* Initial plan

* Fix resource leak: validate HEC settings before creating any output clients

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/38c73e09-789d-4d41-b75e-bbc61418859d

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Ensure SIGHUP never triggers a new email batch across all watch() implementations (#704)

* Initial plan

* Ensure SIGHUP never starts a new email batch in any watch() implementation

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/45d5be30-8f6b-4200-9bdd-15c655033f17

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based config reload for watch mode: address review feedback (#705)

* Initial plan

* Address review feedback: Kafka SSL context, SIGHUP handler safety, test formatting

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/8f2fd48f-32a4-4258-9a89-06f7c7ac29bf

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Reverted changes by copilot that turned errors into warnings

* Enhance usage documentation for config reload: clarify behavior on successful reload and error handling

* Update CHANGELOG.md to reflect config reload enhancements

* Add pytest command to settings for silent output during testing

* Enhance resource management: add close methods for S3Client and HECClient, and improve IMAP connection handling during IDLE. Update CHANGELOG.md for config reload improvements and bug fixes.

* Update changelog to not include fixes within the same unreleased version

* Refactor changelog entries for clarity and consistency in configuration reload section

* Fix changelog entry for msgraph configuration check

* Update CHANGELOG..md

* make single list items on one line in the changelog instead of doing hard wraps

* Remove incorrect IMAP changes

* Rename 'should_reload' parameter to 'config_reloading' in mailbox connection methods for clarity

* Restore startup configuration checks

* Improve error logging for Elasticsearch and OpenSearch exceptions

* Bump version to 9.3.0 in constants.py

* Refactor GelfClient methods to use specific report types instead of generic dicts

* Refactor tests to use assertions consistently and improve type hints

---------

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2026-03-21 16:14:48 -04:00
15 changed files with 303 additions and 172 deletions

View File

@@ -7,7 +7,8 @@
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
"Bash(ls tests*)",
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
"Bash(python -m pytest tests.py --no-header -q)"
],
"additionalDirectories": [
"/tmp"

View File

@@ -4,20 +4,25 @@
### Added
- SIGHUP-based configuration reload for watch mode — update output
destinations, DNS/GeoIP settings, processing flags, and log level
without restarting the service or interrupting in-progress report
processing. Use `systemctl reload parsedmarc` when running under
systemd.
- Extracted `_parse_config_file()` and `_init_output_clients()` from
`_main()` in `cli.py` to support config reload and reduce code
duplication.
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
- Use `systemctl reload parsedmarc` when running under `systemd`.
- On a successful reload, old output clients are closed and recreated.
- On a failed reload, the previous configuration remains fully active.
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
### Fixed
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
## 9.2.1
### Added
- Better checking of `msconfig` configuration (PR #695)
- Better checking of `msgraph` configuration (PR #695)
### Changed

View File

@@ -666,8 +666,15 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
Gmail API, Maildir path) are **not** reloaded — changing those still
requires a full restart.
If the new configuration file contains errors, the reload is aborted
and the previous configuration remains active. Check the logs for
On a **successful** reload, existing output client connections are
closed and new ones are created from the updated configuration. The
service then resumes watching with the new settings.
If the new configuration file contains errors (missing required
settings, unreachable output destinations, etc.), the **entire reload
is aborted** — no output clients are replaced and the previous
configuration remains fully active. This means a typo in one section
will not take down an otherwise working setup. Check the logs for
details:
```bash

View File

@@ -2195,7 +2195,7 @@ def watch_inbox(
batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24,
should_reload: Optional[Callable] = None,
config_reloading: Optional[Callable] = None,
):
"""
Watches the mailbox for new messages and
@@ -2223,7 +2223,7 @@ def watch_inbox(
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
should_reload: Optional callable that returns True when a config
config_reloading: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
"""
@@ -2249,11 +2249,14 @@ def watch_inbox(
)
callback(res)
mailbox_connection.watch(
check_callback=check_callback,
check_timeout=check_timeout,
should_reload=should_reload,
)
watch_kwargs: dict = {
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if config_reloading is not None:
watch_kwargs["config_reloading"] = config_reloading
mailbox_connection.watch(**watch_kwargs)
def append_json(

View File

@@ -242,7 +242,7 @@ def _parse_config_file(config_file, opts):
except Exception as ns_error:
raise ConfigurationError(
"DNS pre-flight check failed: {}".format(ns_error)
)
) from ns_error
if not dummy_hostname:
raise ConfigurationError(
"DNS pre-flight check failed: no PTR record for {} from {}".format(
@@ -259,8 +259,6 @@ def _parse_config_file(config_file, opts):
opts.debug = bool(general_config.getboolean("debug"))
if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose"))
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
if "fail_on_output_error" in general_config:
@@ -588,9 +586,9 @@ def _parse_config_file(config_file, opts):
"index setting missing from the splunk_hec config section"
)
if "skip_certificate_verification" in hec_config:
opts.hec_skip_certificate_verification = hec_config[
"skip_certificate_verification"
]
opts.hec_skip_certificate_verification = bool(
hec_config.getboolean("skip_certificate_verification", fallback=False)
)
if "kafka" in config.sections():
kafka_config = config["kafka"]
@@ -620,14 +618,14 @@ def _parse_config_file(config_file, opts):
if "forensic_topic" in kafka_config:
opts.kafka_forensic_topic = kafka_config["forensic_topic"]
else:
logger.critical(
raise ConfigurationError(
"forensic_topic setting missing from the kafka config section"
)
if "smtp_tls_topic" in kafka_config:
opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"]
else:
logger.critical(
"forensic_topic setting missing from the splunk_hec config section"
raise ConfigurationError(
"smtp_tls_topic setting missing from the kafka config section"
)
if "smtp" in config.sections():
@@ -749,15 +747,15 @@ def _parse_config_file(config_file, opts):
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
opts.gmail_api_service_account_user = gmail_api_config[
"service_account_user"
).strip()
].strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
opts.gmail_api_service_account_user = gmail_api_config[
"delegated_user"
).strip()
].strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
@@ -812,6 +810,38 @@ def _parse_config_file(config_file, opts):
return index_prefix_domain_map
class _ElasticsearchHandle:
"""Sentinel so Elasticsearch participates in _close_output_clients."""
def close(self):
try:
conn = elastic.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
elastic.connections.remove_connection("default")
except Exception:
pass
class _OpenSearchHandle:
"""Sentinel so OpenSearch participates in _close_output_clients."""
def close(self):
try:
conn = opensearch.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
opensearch.connections.remove_connection("default")
except Exception:
pass
def _init_output_clients(opts):
"""Create output clients based on current opts.
@@ -823,76 +853,6 @@ def _init_output_clients(opts):
"""
clients = {}
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
@@ -965,6 +925,83 @@ def _init_output_clients(opts):
timeout=opts.webhook_timeout,
)
# Elasticsearch and OpenSearch mutate module-level global state via
# connections.create_connection(), which cannot be rolled back if a later
# step fails. Initialise them last so that all other clients are created
# successfully first; this minimises the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
clients["opensearch"] = _OpenSearchHandle()
return clients
@@ -995,7 +1032,7 @@ def _main():
if "policy_published" in report:
domain = report["policy_published"]["domain"]
elif "reported_domain" in report:
domain = report("reported_domain")
domain = report["reported_domain"]
elif "policies" in report:
domain = report["policies"][0]["domain"]
if domain:
@@ -1578,7 +1615,6 @@ def _main():
normalize_timespan_threshold_hours=24.0,
fail_on_output_error=False,
)
args = arg_parser.parse_args()
# Snapshot opts as set from CLI args / hardcoded defaults, before any config
# file is applied. On each SIGHUP reload we restore this baseline first so
@@ -1591,7 +1627,7 @@ def _main():
try:
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
except ConfigurationError as e:
logger.error(str(e))
logger.critical(str(e))
exit(-1)
logger.setLevel(logging.ERROR)
@@ -1613,6 +1649,8 @@ def _main():
except Exception as error:
logger.warning("Unable to write to log file: {}".format(error))
opts.active_log_file = opts.log_file
if (
opts.imap_host is None
and opts.graph_client_id is None
@@ -1628,14 +1666,14 @@ def _main():
# Initialize output clients
try:
clients = _init_output_clients(opts)
except elastic.ElasticsearchError:
logger.exception("Elasticsearch Error")
except elastic.ElasticsearchError as e:
logger.exception("Elasticsearch Error: {0}".format(e))
exit(1)
except opensearch.OpenSearchError:
logger.exception("OpenSearch Error")
except opensearch.OpenSearchError as e:
logger.exception("OpenSearch Error: {0}".format(e))
exit(1)
except ConfigurationError as e:
logger.error(str(e))
logger.critical(str(e))
exit(1)
except Exception as error_:
logger.error("Output client error: {0}".format(error_))
@@ -1771,8 +1809,9 @@ def _main():
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error(
"IMAP user and password must be specified ifhost is specified"
"IMAP user and password must be specified if host is specified"
)
exit(1)
ssl = True
verify = True
@@ -1949,8 +1988,9 @@ def _main():
def _handle_sighup(signum, frame):
nonlocal _reload_requested
# Logging is not async-signal-safe; only set the flag here.
# The log message is emitted from the main loop when the flag is read.
_reload_requested = True
logger.info("SIGHUP received, config will reload after current batch")
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
@@ -1959,7 +1999,12 @@ def _main():
logger.info("Watching for email - Quit with ctrl-c")
while True:
_reload_requested = False
# Re-check mailbox_watch in case a config reload disabled watch mode
if not opts.mailbox_watch:
logger.info(
"Mailbox watch disabled in reloaded configuration, stopping watcher"
)
break
try:
watch_inbox(
mailbox_connection=mailbox_connection,
@@ -1980,7 +2025,7 @@ def _main():
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
should_reload=lambda: _reload_requested,
config_reloading=lambda: _reload_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
@@ -1992,7 +2037,12 @@ def _main():
if not _reload_requested:
break
# Reload configuration
# Reload configuration — emit the log message here (not in the
# signal handler, which is not async-signal-safe), then clear the
# flag so that any new SIGHUP arriving while we reload will be
# captured for the next iteration rather than being silently dropped.
logger.info("SIGHUP received, config will reload after current batch")
_reload_requested = False
logger.info("Reloading configuration...")
try:
# Build a fresh opts starting from CLI-only defaults so that
@@ -2036,6 +2086,31 @@ def _main():
if opts.debug:
logger.setLevel(logging.DEBUG)
# Refresh FileHandler if log_file changed
old_log_file = getattr(opts, "active_log_file", None)
new_log_file = opts.log_file
if old_log_file != new_log_file:
# Remove old FileHandlers
for h in list(logger.handlers):
if isinstance(h, logging.FileHandler):
h.close()
logger.removeHandler(h)
# Add new FileHandler if configured
if new_log_file:
try:
fh = logging.FileHandler(new_log_file, "a")
file_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s"
" - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(file_formatter)
logger.addHandler(fh)
except Exception as log_error:
logger.warning(
"Unable to write to log file: {}".format(log_error)
)
opts.active_log_file = new_log_file
logger.info("Configuration reloaded successfully")
except Exception:
logger.exception(

View File

@@ -1,3 +1,3 @@
__version__ = "9.2.1"
__version__ = "9.3.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -3,9 +3,7 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
@@ -14,6 +12,7 @@ from parsedmarc import (
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
log_context_data = threading.local()
@@ -37,7 +36,7 @@ class GelfClient(object):
"""
self.host = host
self.port = port
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger = logging.getLogger("parsedmarc_gelf")
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
@@ -50,7 +49,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -58,13 +57,13 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row

View File

@@ -175,13 +175,15 @@ class GmailConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
sleep(check_timeout)
check_callback(self)
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)
def _find_label_id_for_label(self, label_name: str) -> str:

View File

@@ -278,13 +278,15 @@ class MSGraphConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
sleep(check_timeout)
check_callback(self)
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)
def _find_folder_id_from_folder_path(self, folder_name: str) -> str:

View File

@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
@@ -111,5 +113,5 @@ class IMAPConnection(MailboxConnection):
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if should_reload and should_reload():
if config_reloading and config_reloading():
return

View File

@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
raise NotImplementedError

View File

@@ -63,12 +63,14 @@ class MaildirConnection(MailboxConnection):
def keepalive(self):
return
def watch(self, check_callback, check_timeout, should_reload=None):
def watch(self, check_callback, check_timeout, config_reloading=None):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if should_reload and should_reload():
if config_reloading and config_reloading():
return
sleep(check_timeout)

View File

@@ -93,3 +93,11 @@ class S3Client(object):
self.bucket.put_object(
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
)
def close(self):
"""Clean up the boto3 resource."""
try:
if self.s3.meta is not None:
self.s3.meta.client.close()
except Exception:
pass

View File

@@ -207,3 +207,7 @@ class HECClient(object):
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def close(self):
"""Close the underlying HTTP session."""
self.session.close()

View File

@@ -4,6 +4,7 @@
from __future__ import absolute_import, print_function, unicode_literals
import os
import signal
import sys
import tempfile
import unittest
@@ -11,10 +12,11 @@ from base64 import urlsafe_b64encode
from glob import glob
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import cast
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from lxml import etree
from lxml import etree # type: ignore[import-untyped]
from googleapiclient.errors import HttpError
from httplib2 import Response
from imapclient.exceptions import IMAPClientError
@@ -31,6 +33,7 @@ from parsedmarc.mail.imap import IMAPConnection
import parsedmarc.mail.gmail as gmail_module
import parsedmarc.mail.graph as graph_module
import parsedmarc.mail.imap as imap_module
import parsedmarc.elastic
import parsedmarc.opensearch as opensearch_module
import parsedmarc.utils
@@ -153,7 +156,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
self.assertEqual(result["report_type"], "aggregate")
assert result["report_type"] == "aggregate"
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
def testParseReportFileAcceptsPathForEmail(self):
@@ -164,7 +167,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
self.assertEqual(result["report_type"], "aggregate")
assert result["report_type"] == "aggregate"
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
def testAggregateSamples(self):
@@ -175,10 +178,11 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
result = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
)
assert result["report_type"] == "aggregate"
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
print("Passed!")
def testEmptySample(self):
@@ -194,13 +198,13 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file:
sample_content = sample_file.read()
parsed_report = parsedmarc.parse_report_email(
email_result = parsedmarc.parse_report_email(
sample_content, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
)
assert email_result["report_type"] == "forensic"
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "forensic"
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
print("Passed!")
def testSmtpTlsSamples(self):
@@ -211,10 +215,9 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "smtp_tls"
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
print("Passed!")
def testOpenSearchSigV4RequiresRegion(self):
@@ -1277,7 +1280,7 @@ class TestMailboxWatchSince(unittest.TestCase):
def testWatchInboxPassesSinceToMailboxFetch(self):
mailbox_connection = SimpleNamespace()
def fake_watch(check_callback, check_timeout, should_reload=None):
def fake_watch(check_callback, check_timeout, config_reloading=None):
check_callback(mailbox_connection)
raise _BreakLoop()
@@ -1288,7 +1291,9 @@ class TestMailboxWatchSince(unittest.TestCase):
) as mocked:
with self.assertRaises(_BreakLoop):
parsedmarc.watch_inbox(
mailbox_connection=mailbox_connection,
mailbox_connection=cast(
parsedmarc.MailboxConnection, mailbox_connection
),
callback=callback,
check_timeout=1,
batch_size=10,
@@ -1336,30 +1341,30 @@ since = 2d
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
class _DummyMailboxConnection:
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
def __init__(self):
self.fetch_calls = []
self.fetch_calls: list[dict[str, object]] = []
def create_folder(self, folder_name):
def create_folder(self, folder_name: str):
return None
def fetch_messages(self, reports_folder, **kwargs):
def fetch_messages(self, reports_folder: str, **kwargs):
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
return []
def fetch_message(self, message_id, **kwargs):
def fetch_message(self, message_id) -> str:
return ""
def delete_message(self, message_id):
return None
def move_message(self, message_id, folder_name):
def move_message(self, message_id, folder_name: str):
return None
def keepalive(self):
return None
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
return None
@@ -1445,7 +1450,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"certificate_path setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1517,7 +1522,7 @@ user = owner@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"password setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1558,7 +1563,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testWellKnownFolderFallback(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient()
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1578,7 +1583,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testUnknownFolderStillFails(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient()
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1674,7 +1679,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"client_secret setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1706,7 +1711,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1738,7 +1743,7 @@ tenant_id = tenant-id
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1808,7 +1813,7 @@ mailbox = shared@example.com
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1839,7 +1844,7 @@ tenant_id = tenant-id
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1871,7 +1876,7 @@ certificate_path = /tmp/msgraph-cert.pem
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"tenant_id setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1903,7 +1908,7 @@ certificate_path = /tmp/msgraph-cert.pem
parsedmarc.cli._main()
self.assertEqual(system_exit.exception.code, -1)
mock_logger.error.assert_called_once_with(
mock_logger.critical.assert_called_once_with(
"mailbox setting missing from the msgraph config section"
)
mock_graph_connection.assert_not_called()
@@ -1925,6 +1930,10 @@ password = pass
watch = true
"""
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -1990,6 +1999,10 @@ watch = true
# _parse_config_file called for initial load + reload
self.assertGreaterEqual(mock_parse_config.call_count, 2)
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -2062,6 +2075,10 @@ watch = true
# The failed reload must not have closed the original clients
initial_clients["s3_client"].close.assert_not_called()
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -2133,6 +2150,10 @@ watch = true
# Old client must have been closed when reload succeeded
old_client.close.assert_called_once()
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")