diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 31b1091..9c47292 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -968,6 +968,23 @@ def _init_output_clients(opts): return clients +def _close_output_clients(clients): + """Close output clients that hold persistent connections. + + Clients that do not expose a ``close`` method are silently skipped. + Errors during closing are logged as warnings and do not propagate. + + Args: + clients: dict of client instances returned by :func:`_init_output_clients`. + """ + for name, client in clients.items(): + if hasattr(client, "close"): + try: + client.close() + except Exception: + logger.warning("Error closing %s", name, exc_info=True) + + def _main(): """Called when the module is executed""" @@ -1563,6 +1580,11 @@ def _main(): ) args = arg_parser.parse_args() + # Snapshot opts as set from CLI args / hardcoded defaults, before any config + # file is applied. On each SIGHUP reload we restore this baseline first so + # that sections removed from the config file actually take effect. + opts_from_cli = Namespace(**vars(opts)) + index_prefix_domain_map = None if args.config_file: @@ -1972,10 +1994,21 @@ def _main(): # Reload configuration logger.info("Reloading configuration...") - old_opts_snapshot = Namespace(**vars(opts)) try: - index_prefix_domain_map = _parse_config_file(args.config_file, opts) - clients = _init_output_clients(opts) + # Build a fresh opts starting from CLI-only defaults so that + # sections removed from the config file actually take effect. + new_opts = Namespace(**vars(opts_from_cli)) + new_index_prefix_domain_map = _parse_config_file( + args.config_file, new_opts + ) + new_clients = _init_output_clients(new_opts) + + # All steps succeeded — commit the changes atomically. + _close_output_clients(clients) + clients = new_clients + index_prefix_domain_map = new_index_prefix_domain_map + for k, v in vars(new_opts).items(): + setattr(opts, k, v) # Update watch parameters from reloaded config mailbox_batch_size_value = ( @@ -2008,9 +2041,6 @@ def _main(): logger.exception( "Config reload failed, continuing with previous config" ) - # Restore old opts - for k, v in vars(old_opts_snapshot).items(): - setattr(opts, k, v) if __name__ == "__main__": diff --git a/parsedmarc/gelf.py b/parsedmarc/gelf.py index 67f4f5d..4a6477b 100644 --- a/parsedmarc/gelf.py +++ b/parsedmarc/gelf.py @@ -69,3 +69,8 @@ class GelfClient(object): for row in rows: log_context_data.parsedmarc = row self.logger.info("parsedmarc smtptls report") + + def close(self): + """Remove and close the GELF handler, releasing its connection.""" + self.logger.removeHandler(self.handler) + self.handler.close() diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index e27c9b9..227e102 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -62,6 +62,10 @@ class KafkaClient(object): except NoBrokersAvailable: raise KafkaError("No Kafka brokers available") + def close(self): + """Close the Kafka producer, releasing background threads and sockets.""" + self.producer.close() + @staticmethod def strip_metadata(report: dict[str, Any]): """ diff --git a/parsedmarc/syslog.py b/parsedmarc/syslog.py index d96e56b..ec8e757 100644 --- a/parsedmarc/syslog.py +++ b/parsedmarc/syslog.py @@ -57,7 +57,7 @@ class SyslogClient(object): self.logger.setLevel(logging.INFO) # Create the appropriate syslog handler based on protocol - log_handler = self._create_syslog_handler( + self.log_handler = self._create_syslog_handler( server_name, server_port, self.protocol, @@ -69,7 +69,7 @@ class SyslogClient(object): retry_delay, ) - self.logger.addHandler(log_handler) + self.logger.addHandler(self.log_handler) def _create_syslog_handler( self, @@ -179,3 +179,8 @@ class SyslogClient(object): rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) for row in rows: self.logger.info(json.dumps(row)) + + def close(self): + """Remove and close the syslog handler, releasing its socket.""" + self.logger.removeHandler(self.log_handler) + self.log_handler.close() diff --git a/parsedmarc/webhook.py b/parsedmarc/webhook.py index 5dd05bf..9b6f66f 100644 --- a/parsedmarc/webhook.py +++ b/parsedmarc/webhook.py @@ -63,3 +63,7 @@ class WebhookClient(object): self.session.post(webhook_url, data=payload, timeout=self.timeout) except Exception as error_: logger.error("Webhook Error: {0}".format(error_.__str__())) + + def close(self): + """Close the underlying HTTP session.""" + self.session.close() diff --git a/tests.py b/tests.py index 3c6c89a..64ac057 100755 --- a/tests.py +++ b/tests.py @@ -1910,5 +1910,304 @@ certificate_path = /tmp/msgraph-cert.pem mock_get_mailbox_reports.assert_not_called() +class TestSighupReload(unittest.TestCase): + """Tests for SIGHUP-driven configuration reload in watch mode.""" + + _BASE_CONFIG = """[general] +silent = true + +[imap] +host = imap.example.com +user = user +password = pass + +[mailbox] +watch = true +""" + + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli._parse_config_file") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testSighupTriggersReloadAndWatchRestarts( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_parse_config, + mock_init_clients, + ): + """SIGHUP causes watch to return, config is re-parsed, and watch restarts.""" + import signal as signal_module + + mock_imap.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + + def parse_side_effect(config_file, opts): + opts.imap_host = "imap.example.com" + opts.imap_user = "user" + opts.imap_password = "pass" + opts.mailbox_watch = True + return None + + mock_parse_config.side_effect = parse_side_effect + mock_init_clients.return_value = {} + + call_count = [0] + + def watch_side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + # Simulate SIGHUP arriving while watch is running + if hasattr(signal_module, "SIGHUP"): + import os + + os.kill(os.getpid(), signal_module.SIGHUP) + return # Normal return — reload loop will continue + else: + raise FileExistsError("stop-watch-loop") + + mock_watch.side_effect = watch_side_effect + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(self._BASE_CONFIG) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit) as cm: + parsedmarc.cli._main() + + # Exited with code 1 (from FileExistsError handler) + self.assertEqual(cm.exception.code, 1) + # watch_inbox was called twice: initial run + after reload + self.assertEqual(mock_watch.call_count, 2) + # _parse_config_file called for initial load + reload + self.assertGreaterEqual(mock_parse_config.call_count, 2) + + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli._parse_config_file") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testInvalidConfigOnReloadKeepsPreviousState( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_parse_config, + mock_init_clients, + ): + """A failing reload leaves opts and clients unchanged.""" + import signal as signal_module + + mock_imap.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + + # Initial parse sets required opts; reload parse raises + initial_map = {"prefix_": ["example.com"]} + call_count = [0] + + def parse_side_effect(config_file, opts): + call_count[0] += 1 + opts.imap_host = "imap.example.com" + opts.imap_user = "user" + opts.imap_password = "pass" + opts.mailbox_watch = True + if call_count[0] == 1: + return initial_map + raise RuntimeError("bad config") + + mock_parse_config.side_effect = parse_side_effect + + initial_clients = {"s3_client": MagicMock()} + mock_init_clients.return_value = initial_clients + + watch_calls = [0] + + def watch_side_effect(*args, **kwargs): + watch_calls[0] += 1 + if watch_calls[0] == 1: + if hasattr(signal_module, "SIGHUP"): + import os + + os.kill(os.getpid(), signal_module.SIGHUP) + return + else: + raise FileExistsError("stop") + + mock_watch.side_effect = watch_side_effect + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(self._BASE_CONFIG) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit) as cm: + parsedmarc.cli._main() + + self.assertEqual(cm.exception.code, 1) + # watch was still called twice (reload loop continued after failed reload) + self.assertEqual(mock_watch.call_count, 2) + # The failed reload must not have closed the original clients + initial_clients["s3_client"].close.assert_not_called() + + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli._parse_config_file") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testReloadClosesOldClients( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_parse_config, + mock_init_clients, + ): + """Successful reload closes the old output clients before replacing them.""" + import signal as signal_module + + mock_imap.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + + def parse_side_effect(config_file, opts): + opts.imap_host = "imap.example.com" + opts.imap_user = "user" + opts.imap_password = "pass" + opts.mailbox_watch = True + return None + + mock_parse_config.side_effect = parse_side_effect + + old_client = MagicMock() + new_client = MagicMock() + init_call = [0] + + def init_side_effect(opts): + init_call[0] += 1 + if init_call[0] == 1: + return {"kafka_client": old_client} + return {"kafka_client": new_client} + + mock_init_clients.side_effect = init_side_effect + + watch_calls = [0] + + def watch_side_effect(*args, **kwargs): + watch_calls[0] += 1 + if watch_calls[0] == 1: + if hasattr(signal_module, "SIGHUP"): + import os + + os.kill(os.getpid(), signal_module.SIGHUP) + return + else: + raise FileExistsError("stop") + + mock_watch.side_effect = watch_side_effect + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(self._BASE_CONFIG) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit): + parsedmarc.cli._main() + + # Old client must have been closed when reload succeeded + old_client.close.assert_called_once() + + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testRemovedConfigSectionTakesEffectOnReload( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_init_clients, + ): + """Removing a config section on reload resets that option to its default.""" + import signal as signal_module + + mock_imap.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + mock_init_clients.return_value = {} + + # First config sets kafka_hosts (with required topics); second removes it. + config_v1 = ( + self._BASE_CONFIG + + "\n[kafka]\nhosts = kafka.example.com:9092\n" + + "aggregate_topic = dmarc_agg\n" + + "forensic_topic = dmarc_forensic\n" + + "smtp_tls_topic = smtp_tls\n" + ) + config_v2 = self._BASE_CONFIG # no [kafka] section + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(config_v1) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + watch_calls = [0] + + def watch_side_effect(*args, **kwargs): + watch_calls[0] += 1 + if watch_calls[0] == 1: + # Rewrite config to remove kafka before triggering reload + with open(cfg_path, "w") as f: + f.write(config_v2) + if hasattr(signal_module, "SIGHUP"): + import os + + os.kill(os.getpid(), signal_module.SIGHUP) + return + else: + raise FileExistsError("stop") + + mock_watch.side_effect = watch_side_effect + + # Capture opts used on each _init_output_clients call + init_opts_captures = [] + + def init_side_effect(opts): + from argparse import Namespace as NS + + init_opts_captures.append(NS(**vars(opts))) + return {} + + mock_init_clients.side_effect = init_side_effect + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit): + parsedmarc.cli._main() + + # First init: kafka_hosts should be set from v1 config + self.assertIsNotNone(init_opts_captures[0].kafka_hosts) + # Second init (after reload with v2 config): kafka_hosts should be None + self.assertIsNone(init_opts_captures[1].kafka_hosts) + + if __name__ == "__main__": unittest.main(verbosity=2)