Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
afae6c3232 Fix reload state consistency, resource leaks, stale opts; add tests
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921
2026-03-20 19:39:55 +00:00
copilot-swe-agent[bot]
043c95215c Initial plan 2026-03-20 19:29:33 +00:00
5 changed files with 73 additions and 155 deletions

View File

@@ -2249,14 +2249,11 @@ def watch_inbox(
)
callback(res)
watch_kwargs: dict = {
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if should_reload is not None:
watch_kwargs["should_reload"] = should_reload
mailbox_connection.watch(**watch_kwargs)
mailbox_connection.watch(
check_callback=check_callback,
check_timeout=check_timeout,
should_reload=should_reload,
)
def append_json(

View File

@@ -201,21 +201,8 @@ def _parse_config_file(config_file, opts):
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
map_path = general_config["index_prefix_domain_map"]
try:
with open(map_path) as f:
index_prefix_domain_map = yaml.safe_load(f)
except OSError as exc:
raise ConfigurationError(
"Failed to read index_prefix_domain_map file '{0}': {1}".format(
map_path, exc
)
) from exc
except yaml.YAMLError as exc:
raise ConfigurationError(
"Failed to parse YAML in index_prefix_domain_map "
"file '{0}': {1}".format(map_path, exc)
) from exc
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
if "strip_attachment_payloads" in general_config:
@@ -255,7 +242,7 @@ def _parse_config_file(config_file, opts):
except Exception as ns_error:
raise ConfigurationError(
"DNS pre-flight check failed: {}".format(ns_error)
) from ns_error
)
if not dummy_hostname:
raise ConfigurationError(
"DNS pre-flight check failed: no PTR record for {} from {}".format(
@@ -272,6 +259,8 @@ def _parse_config_file(config_file, opts):
opts.debug = bool(general_config.getboolean("debug"))
if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose"))
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
if "fail_on_output_error" in general_config:
@@ -599,9 +588,9 @@ def _parse_config_file(config_file, opts):
"index setting missing from the splunk_hec config section"
)
if "skip_certificate_verification" in hec_config:
opts.hec_skip_certificate_verification = bool(
hec_config.getboolean("skip_certificate_verification", fallback=False)
)
opts.hec_skip_certificate_verification = hec_config[
"skip_certificate_verification"
]
if "kafka" in config.sections():
kafka_config = config["kafka"]
@@ -631,14 +620,14 @@ def _parse_config_file(config_file, opts):
if "forensic_topic" in kafka_config:
opts.kafka_forensic_topic = kafka_config["forensic_topic"]
else:
raise ConfigurationError(
logger.critical(
"forensic_topic setting missing from the kafka config section"
)
if "smtp_tls_topic" in kafka_config:
opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"]
else:
raise ConfigurationError(
"smtp_tls_topic setting missing from the kafka config section"
logger.critical(
"forensic_topic setting missing from the splunk_hec config section"
)
if "smtp" in config.sections():
@@ -832,13 +821,6 @@ def _init_output_clients(opts):
Raises:
ConfigurationError: If a required output client cannot be created.
"""
# Validate all required settings before creating any clients so that a
# ConfigurationError does not leave partially-created clients un-closed.
if opts.hec and (opts.hec_token is None or opts.hec_index is None):
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
clients = {}
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
@@ -912,98 +894,76 @@ def _init_output_clients(opts):
)
if opts.s3_bucket:
try:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
except Exception:
logger.warning("Failed to initialize S3 client; skipping", exc_info=True)
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
if opts.syslog_server:
try:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
except Exception:
logger.warning(
"Failed to initialize syslog client; skipping", exc_info=True
)
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
try:
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
except Exception:
logger.warning(
"Failed to initialize Splunk HEC client; skipping", exc_info=True
)
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_ssl:
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
try:
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl=opts.kafka_ssl,
ssl_context=ssl_context,
)
except Exception:
logger.warning("Failed to initialize Kafka client; skipping", exc_info=True)
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
if opts.gelf_host:
try:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception:
logger.warning("Failed to initialize GELF client; skipping", exc_info=True)
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
try:
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
except Exception:
logger.warning(
"Failed to initialize webhook client; skipping", exc_info=True
)
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
return clients
@@ -1618,6 +1578,7 @@ def _main():
normalize_timespan_threshold_hours=24.0,
fail_on_output_error=False,
)
args = arg_parser.parse_args()
# Snapshot opts as set from CLI args / hardcoded defaults, before any config
# file is applied. On each SIGHUP reload we restore this baseline first so
@@ -1998,12 +1959,7 @@ def _main():
logger.info("Watching for email - Quit with ctrl-c")
while True:
# Re-check mailbox_watch in case a config reload disabled watch mode
if not opts.mailbox_watch:
logger.info(
"Mailbox watch disabled in reloaded configuration, stopping watcher"
)
break
_reload_requested = False
try:
watch_inbox(
mailbox_connection=mailbox_connection,
@@ -2036,10 +1992,7 @@ def _main():
if not _reload_requested:
break
# Reload configuration — clear the flag first so that any new
# SIGHUP arriving while we reload will be captured for the next
# iteration rather than being silently dropped.
_reload_requested = False
# Reload configuration
logger.info("Reloading configuration...")
try:
# Build a fresh opts starting from CLI-only defaults so that
@@ -2083,31 +2036,6 @@ def _main():
if opts.debug:
logger.setLevel(logging.DEBUG)
# Refresh FileHandler if log_file changed
old_log_file = getattr(opts, "active_log_file", None)
new_log_file = opts.log_file
if old_log_file != new_log_file:
# Remove old FileHandlers
for h in list(logger.handlers):
if isinstance(h, logging.FileHandler):
h.close()
logger.removeHandler(h)
# Add new FileHandler if configured
if new_log_file:
try:
fh = logging.FileHandler(new_log_file, "a")
file_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s"
" - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(file_formatter)
logger.addHandler(fh)
except Exception as log_error:
logger.warning(
"Unable to write to log file: {}".format(log_error)
)
opts.active_log_file = new_log_file
logger.info("Configuration reloaded successfully")
except Exception:
logger.exception(

View File

@@ -178,12 +178,10 @@ class GmailConnection(MailboxConnection):
def watch(self, check_callback, check_timeout, should_reload=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if should_reload and should_reload():
return
sleep(check_timeout)
check_callback(self)
if should_reload and should_reload():
return
check_callback(self)
@lru_cache(maxsize=10)
def _find_label_id_for_label(self, label_name: str) -> str:

View File

@@ -281,10 +281,10 @@ class MSGraphConnection(MailboxConnection):
def watch(self, check_callback, check_timeout, should_reload=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if should_reload and should_reload():
return
sleep(check_timeout)
check_callback(self)
if should_reload and should_reload():
return
@lru_cache(maxsize=10)
def _find_folder_id_from_folder_path(self, folder_name: str) -> str:

View File

@@ -4,7 +4,6 @@
from __future__ import absolute_import, print_function, unicode_literals
import os
import signal
import sys
import tempfile
import unittest
@@ -1926,7 +1925,6 @@ password = pass
watch = true
"""
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -1992,7 +1990,6 @@ watch = true
# _parse_config_file called for initial load + reload
self.assertGreaterEqual(mock_parse_config.call_count, 2)
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -2065,7 +2062,6 @@ watch = true
# The failed reload must not have closed the original clients
initial_clients["s3_client"].close.assert_not_called()
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@@ -2137,7 +2133,6 @@ watch = true
# Old client must have been closed when reload succeeded
old_client.close.assert_called_once()
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")