mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-21 05:55:59 +00:00
Compare commits
2 Commits
config-rel
...
copilot/su
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3509e5c553 | ||
|
|
964a1baeed |
@@ -7,8 +7,7 @@
|
||||
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
|
||||
"Bash(ls tests*)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
|
||||
"Bash(python -m pytest tests.py --no-header -q)"
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
|
||||
],
|
||||
"additionalDirectories": [
|
||||
"/tmp"
|
||||
|
||||
29
CHANGELOG.md
29
CHANGELOG.md
@@ -8,38 +8,11 @@
|
||||
destinations, DNS/GeoIP settings, processing flags, and log level
|
||||
without restarting the service or interrupting in-progress report
|
||||
processing. Use `systemctl reload parsedmarc` when running under
|
||||
systemd. On a successful reload, old output clients are closed and
|
||||
recreated. On a failed reload, the previous configuration remains
|
||||
fully active.
|
||||
- `close()` methods on GelfClient, KafkaClient, SyslogClient,
|
||||
WebhookClient, HECClient, and S3Client for clean resource teardown
|
||||
on reload.
|
||||
- `should_reload` parameter on all `MailboxConnection.watch()`
|
||||
implementations and `watch_inbox()` to ensure SIGHUP never triggers
|
||||
a new email batch mid-reload.
|
||||
- Elasticsearch and OpenSearch connections are now tracked and cleaned
|
||||
up on reload via `_close_output_clients()`.
|
||||
systemd.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from
|
||||
`_main()` in `cli.py` to support config reload and reduce code
|
||||
duplication.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `get_index_prefix()` crashed on forensic reports with `TypeError`
|
||||
due to `report()` instead of `report[]` dict access.
|
||||
- Missing `exit(1)` after IMAP user/password validation failure
|
||||
allowed execution to continue with `None` credentials.
|
||||
- IMAP `watch()` leaked a connection on every IDLE cycle by not
|
||||
closing the old `IMAPClient` before replacing it.
|
||||
- Resource leak in `_init_output_clients()` when Splunk HEC
|
||||
configuration is invalid — the partially-constructed HEC client
|
||||
is now cleaned up on error.
|
||||
- Elasticsearch/OpenSearch `set_hosts()` global state was not
|
||||
rollback-safe on reload failure — init now runs last so other
|
||||
client failures don't leave stale global connections.
|
||||
- `active_log_file` was not initialized at startup, causing the
|
||||
first reload to unnecessarily remove and re-add the FileHandler.
|
||||
|
||||
## 9.2.1
|
||||
|
||||
### Added
|
||||
|
||||
@@ -666,15 +666,8 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
|
||||
Gmail API, Maildir path) are **not** reloaded — changing those still
|
||||
requires a full restart.
|
||||
|
||||
On a **successful** reload, existing output client connections are
|
||||
closed and new ones are created from the updated configuration. The
|
||||
service then resumes watching with the new settings.
|
||||
|
||||
If the new configuration file contains errors (missing required
|
||||
settings, unreachable output destinations, etc.), the **entire reload
|
||||
is aborted** — no output clients are replaced and the previous
|
||||
configuration remains fully active. This means a typo in one section
|
||||
will not take down an otherwise working setup. Check the logs for
|
||||
If the new configuration file contains errors, the reload is aborted
|
||||
and the previous configuration remains active. Check the logs for
|
||||
details:
|
||||
|
||||
```bash
|
||||
|
||||
@@ -201,8 +201,21 @@ def _parse_config_file(config_file, opts):
|
||||
"normalize_timespan_threshold_hours"
|
||||
)
|
||||
if "index_prefix_domain_map" in general_config:
|
||||
with open(general_config["index_prefix_domain_map"]) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
map_path = general_config["index_prefix_domain_map"]
|
||||
try:
|
||||
with open(map_path) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
except OSError as exc:
|
||||
raise ConfigurationError(
|
||||
"Failed to read index_prefix_domain_map file '{0}': {1}".format(
|
||||
map_path, exc
|
||||
)
|
||||
) from exc
|
||||
except yaml.YAMLError as exc:
|
||||
raise ConfigurationError(
|
||||
"Failed to parse YAML in index_prefix_domain_map "
|
||||
"file '{0}': {1}".format(map_path, exc)
|
||||
) from exc
|
||||
if "offline" in general_config:
|
||||
opts.offline = bool(general_config.getboolean("offline"))
|
||||
if "strip_attachment_payloads" in general_config:
|
||||
@@ -747,15 +760,15 @@ def _parse_config_file(config_file, opts):
|
||||
if "oauth2_port" in gmail_api_config:
|
||||
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
|
||||
if "auth_mode" in gmail_api_config:
|
||||
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
|
||||
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
|
||||
if "service_account_user" in gmail_api_config:
|
||||
opts.gmail_api_service_account_user = gmail_api_config[
|
||||
opts.gmail_api_service_account_user = gmail_api_config.get(
|
||||
"service_account_user"
|
||||
].strip()
|
||||
).strip()
|
||||
elif "delegated_user" in gmail_api_config:
|
||||
opts.gmail_api_service_account_user = gmail_api_config[
|
||||
opts.gmail_api_service_account_user = gmail_api_config.get(
|
||||
"delegated_user"
|
||||
].strip()
|
||||
).strip()
|
||||
|
||||
if "maildir" in config.sections():
|
||||
maildir_api_config = config["maildir"]
|
||||
@@ -810,26 +823,6 @@ def _parse_config_file(config_file, opts):
|
||||
return index_prefix_domain_map
|
||||
|
||||
|
||||
class _ElasticsearchHandle:
|
||||
"""Sentinel so Elasticsearch participates in _close_output_clients."""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
elastic.connections.remove_connection("default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class _OpenSearchHandle:
|
||||
"""Sentinel so OpenSearch participates in _close_output_clients."""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
opensearch.connections.remove_connection("default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _init_output_clients(opts):
|
||||
"""Create output clients based on current opts.
|
||||
|
||||
@@ -839,85 +832,15 @@ def _init_output_clients(opts):
|
||||
Raises:
|
||||
ConfigurationError: If a required output client cannot be created.
|
||||
"""
|
||||
# Validate all required settings before creating any clients so that a
|
||||
# ConfigurationError does not leave partially-created clients un-closed.
|
||||
if opts.hec and (opts.hec_token is None or opts.hec_index is None):
|
||||
raise ConfigurationError(
|
||||
"HEC token and HEC index are required when using HEC URL"
|
||||
)
|
||||
|
||||
clients = {}
|
||||
|
||||
if opts.s3_bucket:
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
bucket_path=opts.s3_path,
|
||||
region_name=opts.s3_region_name,
|
||||
endpoint_url=opts.s3_endpoint_url,
|
||||
access_key_id=opts.s3_access_key_id,
|
||||
secret_access_key=opts.s3_secret_access_key,
|
||||
)
|
||||
|
||||
if opts.syslog_server:
|
||||
clients["syslog_client"] = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
protocol=opts.syslog_protocol or "udp",
|
||||
cafile_path=opts.syslog_cafile_path,
|
||||
certfile_path=opts.syslog_certfile_path,
|
||||
keyfile_path=opts.syslog_keyfile_path,
|
||||
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
|
||||
retry_attempts=opts.syslog_retry_attempts
|
||||
if opts.syslog_retry_attempts is not None
|
||||
else 3,
|
||||
retry_delay=opts.syslog_retry_delay
|
||||
if opts.syslog_retry_delay is not None
|
||||
else 5,
|
||||
)
|
||||
|
||||
if opts.hec:
|
||||
if opts.hec_token is None or opts.hec_index is None:
|
||||
raise ConfigurationError(
|
||||
"HEC token and HEC index are required when using HEC URL"
|
||||
)
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
clients["hec_client"] = splunk.HECClient(
|
||||
opts.hec, opts.hec_token, opts.hec_index, verify=verify
|
||||
)
|
||||
|
||||
if opts.kafka_hosts:
|
||||
ssl_context = None
|
||||
if opts.kafka_skip_certificate_verification:
|
||||
logger.debug("Skipping Kafka certificate verification")
|
||||
ssl_context = create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = CERT_NONE
|
||||
clients["kafka_client"] = kafkaclient.KafkaClient(
|
||||
opts.kafka_hosts,
|
||||
username=opts.kafka_username,
|
||||
password=opts.kafka_password,
|
||||
ssl_context=ssl_context,
|
||||
)
|
||||
|
||||
if opts.gelf_host:
|
||||
clients["gelf_client"] = gelf.GelfClient(
|
||||
host=opts.gelf_host,
|
||||
port=int(opts.gelf_port),
|
||||
mode=opts.gelf_mode,
|
||||
)
|
||||
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
clients["webhook_client"] = webhook.WebhookClient(
|
||||
aggregate_url=opts.webhook_aggregate_url,
|
||||
forensic_url=opts.webhook_forensic_url,
|
||||
smtp_tls_url=opts.webhook_smtp_tls_url,
|
||||
timeout=opts.webhook_timeout,
|
||||
)
|
||||
|
||||
# Elasticsearch and OpenSearch mutate module-level global state via
|
||||
# connections.create_connection(), which cannot be rolled back if a later
|
||||
# step fails. Initialise them last so that all other clients are created
|
||||
# successfully first; this minimises the window for partial-init problems
|
||||
# during config reload.
|
||||
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
||||
if opts.elasticsearch_hosts:
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
@@ -951,7 +874,6 @@ def _init_output_clients(opts):
|
||||
aggregate_indexes=[es_aggregate_index],
|
||||
forensic_indexes=[es_forensic_index],
|
||||
)
|
||||
clients["elasticsearch"] = _ElasticsearchHandle()
|
||||
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
@@ -988,7 +910,100 @@ def _init_output_clients(opts):
|
||||
aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index],
|
||||
)
|
||||
clients["opensearch"] = _OpenSearchHandle()
|
||||
|
||||
if opts.s3_bucket:
|
||||
try:
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
bucket_path=opts.s3_path,
|
||||
region_name=opts.s3_region_name,
|
||||
endpoint_url=opts.s3_endpoint_url,
|
||||
access_key_id=opts.s3_access_key_id,
|
||||
secret_access_key=opts.s3_secret_access_key,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize S3 client; skipping", exc_info=True)
|
||||
|
||||
if opts.syslog_server:
|
||||
try:
|
||||
clients["syslog_client"] = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
protocol=opts.syslog_protocol or "udp",
|
||||
cafile_path=opts.syslog_cafile_path,
|
||||
certfile_path=opts.syslog_certfile_path,
|
||||
keyfile_path=opts.syslog_keyfile_path,
|
||||
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
|
||||
retry_attempts=opts.syslog_retry_attempts
|
||||
if opts.syslog_retry_attempts is not None
|
||||
else 3,
|
||||
retry_delay=opts.syslog_retry_delay
|
||||
if opts.syslog_retry_delay is not None
|
||||
else 5,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize syslog client; skipping", exc_info=True
|
||||
)
|
||||
|
||||
if opts.hec:
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
try:
|
||||
clients["hec_client"] = splunk.HECClient(
|
||||
opts.hec, opts.hec_token, opts.hec_index, verify=verify
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize Splunk HEC client; skipping", exc_info=True
|
||||
)
|
||||
|
||||
if opts.kafka_hosts:
|
||||
ssl_context = None
|
||||
if opts.kafka_ssl:
|
||||
ssl_context = create_default_context()
|
||||
if opts.kafka_skip_certificate_verification:
|
||||
logger.debug("Skipping Kafka certificate verification")
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = CERT_NONE
|
||||
try:
|
||||
clients["kafka_client"] = kafkaclient.KafkaClient(
|
||||
opts.kafka_hosts,
|
||||
username=opts.kafka_username,
|
||||
password=opts.kafka_password,
|
||||
ssl=opts.kafka_ssl,
|
||||
ssl_context=ssl_context,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize Kafka client; skipping", exc_info=True)
|
||||
|
||||
if opts.gelf_host:
|
||||
try:
|
||||
clients["gelf_client"] = gelf.GelfClient(
|
||||
host=opts.gelf_host,
|
||||
port=int(opts.gelf_port),
|
||||
mode=opts.gelf_mode,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize GELF client; skipping", exc_info=True)
|
||||
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
try:
|
||||
clients["webhook_client"] = webhook.WebhookClient(
|
||||
aggregate_url=opts.webhook_aggregate_url,
|
||||
forensic_url=opts.webhook_forensic_url,
|
||||
smtp_tls_url=opts.webhook_smtp_tls_url,
|
||||
timeout=opts.webhook_timeout,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize webhook client; skipping", exc_info=True
|
||||
)
|
||||
|
||||
return clients
|
||||
|
||||
@@ -1020,7 +1035,7 @@ def _main():
|
||||
if "policy_published" in report:
|
||||
domain = report["policy_published"]["domain"]
|
||||
elif "reported_domain" in report:
|
||||
domain = report["reported_domain"]
|
||||
domain = report("reported_domain")
|
||||
elif "policies" in report:
|
||||
domain = report["policies"][0]["domain"]
|
||||
if domain:
|
||||
@@ -1637,8 +1652,6 @@ def _main():
|
||||
except Exception as error:
|
||||
logger.warning("Unable to write to log file: {}".format(error))
|
||||
|
||||
opts.active_log_file = opts.log_file
|
||||
|
||||
if (
|
||||
opts.imap_host is None
|
||||
and opts.graph_client_id is None
|
||||
@@ -1797,9 +1810,8 @@ def _main():
|
||||
try:
|
||||
if opts.imap_user is None or opts.imap_password is None:
|
||||
logger.error(
|
||||
"IMAP user and password must be specified if host is specified"
|
||||
"IMAP user and password must be specified ifhost is specified"
|
||||
)
|
||||
exit(1)
|
||||
|
||||
ssl = True
|
||||
verify = True
|
||||
@@ -1976,9 +1988,8 @@ def _main():
|
||||
|
||||
def _handle_sighup(signum, frame):
|
||||
nonlocal _reload_requested
|
||||
# Logging is not async-signal-safe; only set the flag here.
|
||||
# The log message is emitted from the main loop when the flag is read.
|
||||
_reload_requested = True
|
||||
logger.info("SIGHUP received, config will reload after current batch")
|
||||
|
||||
if hasattr(signal, "SIGHUP"):
|
||||
signal.signal(signal.SIGHUP, _handle_sighup)
|
||||
@@ -2025,11 +2036,9 @@ def _main():
|
||||
if not _reload_requested:
|
||||
break
|
||||
|
||||
# Reload configuration — emit the log message here (not in the
|
||||
# signal handler, which is not async-signal-safe), then clear the
|
||||
# flag so that any new SIGHUP arriving while we reload will be
|
||||
# captured for the next iteration rather than being silently dropped.
|
||||
logger.info("SIGHUP received, config will reload after current batch")
|
||||
# Reload configuration — clear the flag first so that any new
|
||||
# SIGHUP arriving while we reload will be captured for the next
|
||||
# iteration rather than being silently dropped.
|
||||
_reload_requested = False
|
||||
logger.info("Reloading configuration...")
|
||||
try:
|
||||
|
||||
@@ -284,8 +284,6 @@ class MSGraphConnection(MailboxConnection):
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
|
||||
@@ -90,17 +90,10 @@ class IMAPConnection(MailboxConnection):
|
||||
# IDLE callback sends IMAPClient object,
|
||||
# send back the imap connection object instead
|
||||
def idle_callback_wrapper(client: IMAPClient):
|
||||
old_client = self._client
|
||||
self._client = client
|
||||
try:
|
||||
old_client.logout()
|
||||
except Exception:
|
||||
pass
|
||||
check_callback(self)
|
||||
|
||||
while True:
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
try:
|
||||
IMAPClient(
|
||||
host=self._client.host,
|
||||
|
||||
@@ -65,8 +65,6 @@ class MaildirConnection(MailboxConnection):
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
while True:
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
try:
|
||||
check_callback(self)
|
||||
except Exception as e:
|
||||
|
||||
@@ -93,11 +93,3 @@ class S3Client(object):
|
||||
self.bucket.put_object(
|
||||
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Clean up the boto3 resource."""
|
||||
try:
|
||||
if self.s3.meta is not None:
|
||||
self.s3.meta.client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -207,7 +207,3 @@ class HECClient(object):
|
||||
raise SplunkError(e.__str__())
|
||||
if response["code"] != 0:
|
||||
raise SplunkError(response["text"])
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying HTTP session."""
|
||||
self.session.close()
|
||||
|
||||
20
tests.py
20
tests.py
@@ -1926,10 +1926,7 @@ password = pass
|
||||
watch = true
|
||||
"""
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -1995,10 +1992,7 @@ watch = true
|
||||
# _parse_config_file called for initial load + reload
|
||||
self.assertGreaterEqual(mock_parse_config.call_count, 2)
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -2071,10 +2065,7 @@ watch = true
|
||||
# The failed reload must not have closed the original clients
|
||||
initial_clients["s3_client"].close.assert_not_called()
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -2146,10 +2137,7 @@ watch = true
|
||||
# Old client must have been closed when reload succeeded
|
||||
old_client.close.assert_called_once()
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
|
||||
Reference in New Issue
Block a user