From d3510da3a62faf96e8d28eb5847ad23d7835b3c7 Mon Sep 17 00:00:00 2001 From: Casper Biering Date: Sat, 13 Jun 2026 02:00:32 +0200 Subject: [PATCH] feat: graceful SIGTERM/SIGINT shutdown for watch mode and one-shot CLI (#794) * feat: graceful SIGTERM/SIGINT shutdown for watch mode and one-shot CLI Previously SIGTERM (systemctl stop, docker stop, Kubernetes pod termination) killed parsedmarc mid-batch, tearing output writes and silently dropping buffered Kafka records. Shutdown is now cooperative: - SIGTERM/SIGINT set a flag that is polled at safe boundaries. The one-shot CLI checks it between batches; watch mode passes it as `config_reloading` so the mailbox backend -- including the IMAP IDLE loop -- returns once the current batch is fully processed. Either way the in-flight batch and its output writes finish before the process exits 0. - Ctrl-C is a double-tap: the first press is graceful, the second short-circuits to os._exit(130). - Output clients are now closed on every exit path (atexit plus a trailing close in _main), fixing a long-standing leak where one-shot runs and graceful shutdowns never flushed Kafka / closed Elasticsearch / S3 / etc. Docs: the example systemd unit gains KillSignal=SIGTERM and TimeoutStopSec=60 (keep it above mailbox_check_timeout). Tests cover watch shutdown, the one-shot between-batch stop, the SIGINT double-tap, and the output-client-close leak. * test: cover the one-shot mbox-loop shutdown break Extend the one-shot SIGTERM test to also pass an .mbox path so a single run exercises both shutdown checkpoints: the file-batch loop break and the subsequent mbox loop break (which Codecov flagged as the only uncovered lines on PR #794). is_mbox is keyed by suffix and get_dmarc_reports_from_mbox is asserted not called, since the mbox loop breaks before reaching it. * test: narrow signal.getsignal() return before invoking in SIGINT test signal.getsignal() is typed Callable | int | Handlers | None; calling it directly fails pyright's callable check. Assert callable() first. Co-Authored-By: Claude Fable 5 --------- Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com> Co-authored-by: Claude Fable 5 --- CHANGELOG.md | 8 ++ docs/source/usage.md | 13 +++ parsedmarc/__init__.py | 4 +- parsedmarc/cli.py | 92 ++++++++++++--- tests/test_cli.py | 250 ++++++++++++++++++++++++++++++++++++++++- 5 files changed, 350 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fdb97a..b80b6a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 10.1.0 + +### Changes + +- **Graceful shutdown on `SIGTERM` / `SIGINT`.** Previously `SIGTERM` (sent by `systemctl stop`, `docker stop`, and Kubernetes pod termination) killed the process mid-batch, tearing output writes and silently dropping Kafka producer buffers. parsedmarc now sets a shutdown flag that is polled at safe boundaries: the one-shot CLI checks it between batches, and the watcher passes it as `config_reloading` so the mailbox backend — including the IMAP IDLE loop — returns once the current batch is processed. Either way the current batch and its output writes finish before the process exits. Ctrl-C is a double-tap: the first press is graceful, the second short-circuits to `os._exit(130)`. This requires the `config_reloading`-aware IMAP IDLE loop from the pinned `mailsuite` fork; with a stock `mailsuite` the IDLE watcher cannot be interrupted cleanly. +- **Output clients are now closed on every exit path** via `atexit` plus a trailing close at the end of `_main()`, fixing a long-standing leak where one-shot CLI runs and graceful shutdowns never flushed Kafka / closed Elasticsearch / S3 / etc. clients. +- **Example systemd unit** in `docs/source/usage.md` now sets `KillSignal=SIGTERM` and `TimeoutStopSec=60` so systemd waits long enough for the watcher to drain (keep it above `mailbox_check_timeout`). + ## 10.0.4 ### Docker diff --git a/docs/source/usage.md b/docs/source/usage.md index 94ab6bb..98fc7bd 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -818,6 +818,8 @@ After=network.target network-online.target elasticsearch.service [Service] ExecStart=/opt/parsedmarc/venv/bin/parsedmarc -c /etc/parsedmarc.ini ExecReload=/bin/kill -HUP $MAINPID +KillSignal=SIGTERM +TimeoutStopSec=60 User=parsedmarc Group=parsedmarc Restart=always @@ -850,6 +852,17 @@ sudo service parsedmarc restart ::: +:::{note} +On `systemctl stop`/`restart` (or Ctrl-C) `parsedmarc` finishes the +current batch, flushes its outputs, and exits cleanly. Shutdown is +observed at batch boundaries, so the worst-case delay is roughly +`mailbox_check_timeout` (default 30s) plus the batch's processing and +flush time. Keep `TimeoutStopSec` comfortably above +`mailbox_check_timeout` (≈2×, and raise both together) or systemd will +`SIGKILL` mid-batch. In the foreground, a second Ctrl-C force-quits +immediately, skipping the output flush. +::: + ### Reloading configuration without restarting When running in watch mode, `parsedmarc` supports reloading its diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 585e1f1..3b7d906 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -2505,7 +2505,9 @@ def watch_inbox( since: Search for messages since certain time normalize_timespan_threshold_hours (float): Normalize timespans beyond this config_reloading: Optional callable that returns True when a config - reload has been requested (e.g. via SIGHUP) + reload (or shutdown) has been requested (e.g. via SIGHUP/SIGTERM). + Polled by the mailbox backend between checks, including the IMAP + IDLE loop, so the watcher exits cleanly at a safe boundary. """ def check_callback(connection): diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index c77bfc0..bfec01a 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -3,6 +3,7 @@ """A CLI for parsing DMARC reports""" +import atexit import http.client import json import logging @@ -1363,11 +1364,14 @@ def _close_output_clients(clients): Clients that do not expose a ``close`` method are silently skipped. Errors during closing are logged as warnings and do not propagate. + Idempotent: each client is popped as it is closed, so a second call + (e.g. the trailing close plus the atexit safety net) is a no-op. Args: clients: dict of client instances returned by :func:`_init_output_clients`. """ - for name, client in clients.items(): + while clients: + name, client = clients.popitem() if hasattr(client, "close"): try: client.close() @@ -2135,6 +2139,46 @@ def _main(): logger.error("Output client error: {0}".format(error_)) exit(1) + # Always close output clients on the way out (normal return, + # exit(N), uncaught exception, or SystemExit from a signal-driven + # shutdown). atexit does NOT fire on os._exit(130) — that's + # intentional for the SIGINT double-tap. The lambda closes whatever + # `clients` currently points at, so a SIGHUP reload that swaps the + # dict in-place is still covered. + atexit.register(lambda: _close_output_clients(clients)) + + # Signal handlers set a cooperative flag polled at safe checkpoints: + # the one-shot loops check it between batches; the watch loop relies + # on the mailbox backend polling `config_reloading` (which includes + # this flag) between checks, including inside the IMAP IDLE loop, so + # the current batch finishes before the watcher exits. SIGINT is a + # "double tap": the first press is graceful, the second short-circuits + # to os._exit(130). os._exit is async-signal-safe; sys.exit and + # logging are not, so the handlers only set flags / call os._exit. + _reload_requested = False + _shutdown_requested = False + _sigint_count = 0 + + def _handle_sighup(signum, frame): + nonlocal _reload_requested + _reload_requested = True + + def _handle_sigterm(signum, frame): + nonlocal _shutdown_requested + _shutdown_requested = True + + def _handle_sigint(signum, frame): + nonlocal _shutdown_requested, _sigint_count + _sigint_count += 1 + if _sigint_count >= 2: + os._exit(130) + _shutdown_requested = True + + if hasattr(signal, "SIGHUP"): + signal.signal(signal.SIGHUP, _handle_sighup) + signal.signal(signal.SIGTERM, _handle_sigterm) + signal.signal(signal.SIGINT, _handle_sigint) + file_paths = _expand_file_path_args(args.file_path) mbox_paths = [] @@ -2165,6 +2209,17 @@ def _main(): current_log_file = opts.log_file for batch_index in range((len(file_paths) + n_procs - 1) // n_procs): + # Honor a shutdown request between batches before spawning the + # next pool. Anything already parsed is still in `results` and + # will go through process_reports() in the cleanup path so we + # don't lose work the operator already paid for. + if _shutdown_requested: + logger.info( + "Shutdown requested, stopping file processing after %d batch(es)", + batch_index, + ) + break + processes = [] connections = [] @@ -2233,6 +2288,9 @@ def _main(): smtp_tls_reports.append(result[0]["report"]) for mbox_path in mbox_paths: + if _shutdown_requested: + logger.info("Shutdown requested, skipping remaining mbox files") + break normalize_timespan_threshold_hours_value = ( float(opts.normalize_timespan_threshold_hours) if opts.normalize_timespan_threshold_hours is not None @@ -2374,6 +2432,7 @@ def _main(): if opts.normalize_timespan_threshold_hours is not None else 24.0 ) + if mailbox_connection and not _shutdown_requested: try: reports = get_dmarc_reports_from_mailbox( connection=mailbox_connection, @@ -2441,20 +2500,8 @@ def _main(): logger.exception("Failed to email results") exit(1) - # SIGHUP-based config reload for watch mode - _reload_requested = False - - def _handle_sighup(signum, frame): - nonlocal _reload_requested - # Logging is not async-signal-safe; only set the flag here. - # The log message is emitted from the main loop when the flag is read. - _reload_requested = True - - if hasattr(signal, "SIGHUP"): - signal.signal(signal.SIGHUP, _handle_sighup) - if mailbox_connection and opts.mailbox_watch: - logger.info("Watching for email - Quit with ctrl-c") + logger.info("Watching for email - Ctrl-C once to quit, twice to force") while True: # Re-check mailbox_watch in case a config reload disabled watch mode @@ -2464,6 +2511,10 @@ def _main(): ) break try: + # `config_reloading` returns True on SIGHUP (reload) or + # SIGTERM/SIGINT (shutdown); the backend polls it between + # checks — including inside the IMAP IDLE loop — and returns + # at a safe boundary once the current batch is processed. watch_inbox( mailbox_connection=mailbox_connection, callback=process_reports, @@ -2484,7 +2535,7 @@ def _main(): reverse_dns_map_url=opts.reverse_dns_map_url, offline=opts.offline, normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value, - config_reloading=lambda: _reload_requested, + config_reloading=lambda: _reload_requested or _shutdown_requested, ) except FileExistsError as error: logger.error("{0}".format(error.__str__())) @@ -2493,6 +2544,12 @@ def _main(): logger.error(error.__str__()) exit(1) + # Prioritize shutdown over reload if both flags are set (e.g. + # SIGHUP followed by SIGTERM). atexit closes output clients. + if _shutdown_requested: + logger.info("Shutdown requested, exiting watch loop") + break + if not _reload_requested: break @@ -2610,6 +2667,11 @@ def _main(): "Config reload failed, continuing with previous config" ) + # Close output clients on the success path (one-shot or graceful + # watch-loop exit). atexit-registered above is the safety net for + # exit(1) / uncaught-exception paths. + _close_output_clients(clients) + if __name__ == "__main__": _main() diff --git a/tests/test_cli.py b/tests/test_cli.py index f0bc3cf..b208a7d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1612,7 +1612,7 @@ watch = true self.assertEqual(cm.exception.code, 1) # watch was still called twice (reload loop continued after failed reload) self.assertEqual(mock_watch.call_count, 2) - # The failed reload must not have closed the original clients + # Old clients should NOT have been closed (reload failed before swap) initial_clients["s3_client"].close.assert_not_called() @unittest.skipUnless( @@ -1870,6 +1870,254 @@ watch = true ) +class TestSigtermShutdown(unittest.TestCase): + """Tests for graceful SIGTERM/SIGINT shutdown.""" + + def setUp(self): + from parsedmarc.log import logger as _logger + + _logger.disabled = True + self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO) + self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO) + self._stdout_patch.start() + self._stderr_patch.start() + + def tearDown(self): + from parsedmarc.log import logger as _logger + + _logger.disabled = False + self._stderr_patch.stop() + self._stdout_patch.stop() + + _BASE_CONFIG = """[general] +silent = true + +[imap] +host = imap.example.com +user = user +password = pass + +[mailbox] +watch = true +""" + + def _write_config(self, body=None): + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(body if body is not None else self._BASE_CONFIG) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + return cfg_path + + @staticmethod + def _empty_reports(): + return { + "aggregate_reports": [], + "failure_reports": [], + "smtp_tls_reports": [], + } + + @staticmethod + def _parse_config_side_effect(config, opts): + opts.imap_host = "imap.example.com" + opts.imap_user = "user" + opts.imap_password = "pass" + opts.mailbox_watch = True + return None + + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli._parse_config") + @patch("parsedmarc.cli._load_config") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testSigtermDuringWatchExitsCleanlyAndClosesClients( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_load_config, + mock_parse_config, + mock_init_clients, + ): + """SIGTERM during watch: the backend polls config_reloading, + observes the flag, and returns at a safe boundary; _main breaks + the watch loop, returns normally, and closes every output client + that exposes a `.close()`.""" + mock_imap.return_value = object() + mock_load_config.return_value = ConfigParser() + mock_parse_config.side_effect = self._parse_config_side_effect + mock_get_reports.return_value = self._empty_reports() + + kafka_client = MagicMock(spec=["close"]) + elasticsearch_client = MagicMock(spec=["close"]) + no_close_client = MagicMock(spec=[]) # no `close` attr → skipped + mock_init_clients.return_value = { + "kafka": kafka_client, + "elasticsearch": elasticsearch_client, + "syslog": no_close_client, + } + + observed = [] + + def watch_side_effect(*args, **kwargs): + # SIGTERM lands while watching; the backend then polls + # config_reloading at its next safe boundary and returns. + os.kill(os.getpid(), signal.SIGTERM) + observed.append(kwargs["config_reloading"]()) + + mock_watch.side_effect = watch_side_effect + cfg_path = self._write_config() + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + parsedmarc.cli._main() + + self.assertEqual(mock_watch.call_count, 1) + self.assertEqual(observed, [True]) + kafka_client.close.assert_called() + elasticsearch_client.close.assert_called() + + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli._parse_config") + @patch("parsedmarc.cli._load_config") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testFirstSigintGracefulSecondSigintHardExits( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_load_config, + mock_parse_config, + mock_init_clients, + ): + """First SIGINT → graceful flag, second SIGINT → os._exit(130). + + The installed handler is invoked directly via signal.getsignal() + because two POSIX SIGINTs sent in rapid succession from the same + process can be coalesced by the kernel (standard signals don't + queue).""" + mock_imap.return_value = object() + mock_load_config.return_value = ConfigParser() + mock_parse_config.side_effect = self._parse_config_side_effect + mock_get_reports.return_value = self._empty_reports() + mock_init_clients.return_value = {} + + sentinel = SystemExit("os._exit was reached") + + def fake_exit(code): + raise sentinel + + def watch_side_effect(*args, **kwargs): + handler = signal.getsignal(signal.SIGINT) + # getsignal() can return SIG_DFL/SIG_IGN/None; narrow the type + # so the handler can be invoked directly. + assert callable(handler) + handler(signal.SIGINT, None) # first press: graceful flag + handler(signal.SIGINT, None) # second press: hits os._exit + + mock_watch.side_effect = watch_side_effect + cfg_path = self._write_config() + + with patch("parsedmarc.cli.os._exit", side_effect=fake_exit) as mock_exit: + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit) as cm: + parsedmarc.cli._main() + + self.assertIs(cm.exception, sentinel) + mock_exit.assert_called_once_with(130) + + @patch("parsedmarc.cli.get_dmarc_reports_from_mbox") + @patch("parsedmarc.cli.is_mbox", side_effect=lambda p: p.endswith(".mbox")) + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli.Process") + @patch("parsedmarc.cli.glob") + def testSigtermDuringOneShotStopsBetweenBatchesAndMbox( + self, + mock_glob, + mock_process_cls, + mock_init_clients, + mock_is_mbox, + mock_get_mbox, + ): + """SIGTERM during one-shot processing: the in-flight child is + joined normally (no work lost), the file-batch loop stops before + spawning the next batch, and the subsequent mbox loop breaks on + its first iteration (the flag is already set). Output clients are + still closed. + + Two ``.xml`` files give the batch loop a second iteration to hit + its break; one ``.mbox`` file routes into ``mbox_paths`` so the + mbox break is exercised too. ``is_mbox`` is keyed by suffix so the + fake filenames don't trigger ``mailbox.mbox(path, create=True)``.""" + mock_glob.return_value = ["a.xml", "b.xml", "c.mbox"] + + kafka_client = MagicMock(spec=["close"]) + mock_init_clients.return_value = {"kafka": kafka_client} + + starts = [] + + class FakeProc: + """Stand-in child that finishes its file and sends a result + even though SIGTERM arrived mid-batch.""" + + def __init__(self, target=None, args=()): + self._args = args + + def start(self): + starts.append(self._args[0]) + if len(starts) == 1: + os.kill(os.getpid(), signal.SIGTERM) + # Child still completes and reports back over the pipe. + self._args[-3].send([None, self._args[0]]) + + def join(self, timeout=None): + return None + + mock_process_cls.side_effect = FakeProc + + with patch.object(sys, "argv", ["parsedmarc", "a.xml", "b.xml", "c.mbox"]): + parsedmarc.cli._main() + + # Only the first xml batch ran before the batch loop broke, and the + # mbox loop broke before processing its file. + self.assertEqual(len(starts), 1) + mock_get_mbox.assert_not_called() + kafka_client.close.assert_called() + + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli.cli_parse") + @patch("parsedmarc.cli.glob") + def testNormalOneShotExitClosesOutputClients( + self, + mock_glob, + mock_cli_parse, + mock_init_clients, + ): + """A successful one-shot run with no signal still closes its + output clients — regression for the long-standing leak where + _close_output_clients was only called inside the SIGHUP + reload path.""" + mock_glob.return_value = [] + kafka_client = MagicMock(spec=["close"]) + es_client = MagicMock(spec=["close"]) + mock_init_clients.return_value = { + "kafka": kafka_client, + "elasticsearch": es_client, + } + + # No watch, no mailbox, no files → _main runs through with + # empty parsing_results and returns normally. + with patch.object(sys, "argv", ["parsedmarc", "nothing-here.xml"]): + try: + parsedmarc.cli._main() + except SystemExit: + pass + + kafka_client.close.assert_called_once() + es_client.close.assert_called_once() + + class TestIndexPrefixDomainMapTlsFiltering(unittest.TestCase): """Tests that SMTP TLS reports for unmapped domains are filtered out when index_prefix_domain_map is configured."""