feat: graceful SIGTERM/SIGINT shutdown for watch mode and one-shot CLI (#794)

* feat: graceful SIGTERM/SIGINT shutdown for watch mode and one-shot CLI

Previously SIGTERM (systemctl stop, docker stop, Kubernetes pod termination)
killed parsedmarc mid-batch, tearing output writes and silently dropping
buffered Kafka records. Shutdown is now cooperative:

- SIGTERM/SIGINT set a flag that is polled at safe boundaries. The one-shot
  CLI checks it between batches; watch mode passes it as `config_reloading` so
  the mailbox backend -- including the IMAP IDLE loop -- returns once the
  current batch is fully processed. Either way the in-flight batch and its
  output writes finish before the process exits 0.
- Ctrl-C is a double-tap: the first press is graceful, the second
  short-circuits to os._exit(130).
- Output clients are now closed on every exit path (atexit plus a trailing
  close in _main), fixing a long-standing leak where one-shot runs and
  graceful shutdowns never flushed Kafka / closed Elasticsearch / S3 / etc.

Docs: the example systemd unit gains KillSignal=SIGTERM and TimeoutStopSec=60
(keep it above mailbox_check_timeout). Tests cover watch shutdown, the one-shot
between-batch stop, the SIGINT double-tap, and the output-client-close leak.

* test: cover the one-shot mbox-loop shutdown break

Extend the one-shot SIGTERM test to also pass an .mbox path so a single
run exercises both shutdown checkpoints: the file-batch loop break and the
subsequent mbox loop break (which Codecov flagged as the only uncovered
lines on PR #794). is_mbox is keyed by suffix and get_dmarc_reports_from_mbox
is asserted not called, since the mbox loop breaks before reaching it.

* test: narrow signal.getsignal() return before invoking in SIGINT test

signal.getsignal() is typed Callable | int | Handlers | None; calling it
directly fails pyright's callable check. Assert callable() first.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

---------

Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Casper Biering
2026-06-13 02:00:32 +02:00
committed by GitHub
parent b869235224
commit d3510da3a6
5 changed files with 350 additions and 17 deletions
+8
View File
@@ -1,5 +1,13 @@
# Changelog
## 10.1.0
### Changes
- **Graceful shutdown on `SIGTERM` / `SIGINT`.** Previously `SIGTERM` (sent by `systemctl stop`, `docker stop`, and Kubernetes pod termination) killed the process mid-batch, tearing output writes and silently dropping Kafka producer buffers. parsedmarc now sets a shutdown flag that is polled at safe boundaries: the one-shot CLI checks it between batches, and the watcher passes it as `config_reloading` so the mailbox backend — including the IMAP IDLE loop — returns once the current batch is processed. Either way the current batch and its output writes finish before the process exits. Ctrl-C is a double-tap: the first press is graceful, the second short-circuits to `os._exit(130)`. This requires the `config_reloading`-aware IMAP IDLE loop from the pinned `mailsuite` fork; with a stock `mailsuite` the IDLE watcher cannot be interrupted cleanly.
- **Output clients are now closed on every exit path** via `atexit` plus a trailing close at the end of `_main()`, fixing a long-standing leak where one-shot CLI runs and graceful shutdowns never flushed Kafka / closed Elasticsearch / S3 / etc. clients.
- **Example systemd unit** in `docs/source/usage.md` now sets `KillSignal=SIGTERM` and `TimeoutStopSec=60` so systemd waits long enough for the watcher to drain (keep it above `mailbox_check_timeout`).
## 10.0.4
### Docker
+13
View File
@@ -818,6 +818,8 @@ After=network.target network-online.target elasticsearch.service
[Service]
ExecStart=/opt/parsedmarc/venv/bin/parsedmarc -c /etc/parsedmarc.ini
ExecReload=/bin/kill -HUP $MAINPID
KillSignal=SIGTERM
TimeoutStopSec=60
User=parsedmarc
Group=parsedmarc
Restart=always
@@ -850,6 +852,17 @@ sudo service parsedmarc restart
:::
:::{note}
On `systemctl stop`/`restart` (or Ctrl-C) `parsedmarc` finishes the
current batch, flushes its outputs, and exits cleanly. Shutdown is
observed at batch boundaries, so the worst-case delay is roughly
`mailbox_check_timeout` (default 30s) plus the batch's processing and
flush time. Keep `TimeoutStopSec` comfortably above
`mailbox_check_timeout` (≈2×, and raise both together) or systemd will
`SIGKILL` mid-batch. In the foreground, a second Ctrl-C force-quits
immediately, skipping the output flush.
:::
### Reloading configuration without restarting
When running in watch mode, `parsedmarc` supports reloading its
+3 -1
View File
@@ -2505,7 +2505,9 @@ def watch_inbox(
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
config_reloading: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
reload (or shutdown) has been requested (e.g. via SIGHUP/SIGTERM).
Polled by the mailbox backend between checks, including the IMAP
IDLE loop, so the watcher exits cleanly at a safe boundary.
"""
def check_callback(connection):
+77 -15
View File
@@ -3,6 +3,7 @@
"""A CLI for parsing DMARC reports"""
import atexit
import http.client
import json
import logging
@@ -1363,11 +1364,14 @@ def _close_output_clients(clients):
Clients that do not expose a ``close`` method are silently skipped.
Errors during closing are logged as warnings and do not propagate.
Idempotent: each client is popped as it is closed, so a second call
(e.g. the trailing close plus the atexit safety net) is a no-op.
Args:
clients: dict of client instances returned by :func:`_init_output_clients`.
"""
for name, client in clients.items():
while clients:
name, client = clients.popitem()
if hasattr(client, "close"):
try:
client.close()
@@ -2135,6 +2139,46 @@ def _main():
logger.error("Output client error: {0}".format(error_))
exit(1)
# Always close output clients on the way out (normal return,
# exit(N), uncaught exception, or SystemExit from a signal-driven
# shutdown). atexit does NOT fire on os._exit(130) — that's
# intentional for the SIGINT double-tap. The lambda closes whatever
# `clients` currently points at, so a SIGHUP reload that swaps the
# dict in-place is still covered.
atexit.register(lambda: _close_output_clients(clients))
# Signal handlers set a cooperative flag polled at safe checkpoints:
# the one-shot loops check it between batches; the watch loop relies
# on the mailbox backend polling `config_reloading` (which includes
# this flag) between checks, including inside the IMAP IDLE loop, so
# the current batch finishes before the watcher exits. SIGINT is a
# "double tap": the first press is graceful, the second short-circuits
# to os._exit(130). os._exit is async-signal-safe; sys.exit and
# logging are not, so the handlers only set flags / call os._exit.
_reload_requested = False
_shutdown_requested = False
_sigint_count = 0
def _handle_sighup(signum, frame):
nonlocal _reload_requested
_reload_requested = True
def _handle_sigterm(signum, frame):
nonlocal _shutdown_requested
_shutdown_requested = True
def _handle_sigint(signum, frame):
nonlocal _shutdown_requested, _sigint_count
_sigint_count += 1
if _sigint_count >= 2:
os._exit(130)
_shutdown_requested = True
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
signal.signal(signal.SIGTERM, _handle_sigterm)
signal.signal(signal.SIGINT, _handle_sigint)
file_paths = _expand_file_path_args(args.file_path)
mbox_paths = []
@@ -2165,6 +2209,17 @@ def _main():
current_log_file = opts.log_file
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
# Honor a shutdown request between batches before spawning the
# next pool. Anything already parsed is still in `results` and
# will go through process_reports() in the cleanup path so we
# don't lose work the operator already paid for.
if _shutdown_requested:
logger.info(
"Shutdown requested, stopping file processing after %d batch(es)",
batch_index,
)
break
processes = []
connections = []
@@ -2233,6 +2288,9 @@ def _main():
smtp_tls_reports.append(result[0]["report"])
for mbox_path in mbox_paths:
if _shutdown_requested:
logger.info("Shutdown requested, skipping remaining mbox files")
break
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
@@ -2374,6 +2432,7 @@ def _main():
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
if mailbox_connection and not _shutdown_requested:
try:
reports = get_dmarc_reports_from_mailbox(
connection=mailbox_connection,
@@ -2441,20 +2500,8 @@ def _main():
logger.exception("Failed to email results")
exit(1)
# SIGHUP-based config reload for watch mode
_reload_requested = False
def _handle_sighup(signum, frame):
nonlocal _reload_requested
# Logging is not async-signal-safe; only set the flag here.
# The log message is emitted from the main loop when the flag is read.
_reload_requested = True
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
if mailbox_connection and opts.mailbox_watch:
logger.info("Watching for email - Quit with ctrl-c")
logger.info("Watching for email - Ctrl-C once to quit, twice to force")
while True:
# Re-check mailbox_watch in case a config reload disabled watch mode
@@ -2464,6 +2511,10 @@ def _main():
)
break
try:
# `config_reloading` returns True on SIGHUP (reload) or
# SIGTERM/SIGINT (shutdown); the backend polls it between
# checks — including inside the IMAP IDLE loop — and returns
# at a safe boundary once the current batch is processed.
watch_inbox(
mailbox_connection=mailbox_connection,
callback=process_reports,
@@ -2484,7 +2535,7 @@ def _main():
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
config_reloading=lambda: _reload_requested,
config_reloading=lambda: _reload_requested or _shutdown_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
@@ -2493,6 +2544,12 @@ def _main():
logger.error(error.__str__())
exit(1)
# Prioritize shutdown over reload if both flags are set (e.g.
# SIGHUP followed by SIGTERM). atexit closes output clients.
if _shutdown_requested:
logger.info("Shutdown requested, exiting watch loop")
break
if not _reload_requested:
break
@@ -2610,6 +2667,11 @@ def _main():
"Config reload failed, continuing with previous config"
)
# Close output clients on the success path (one-shot or graceful
# watch-loop exit). atexit-registered above is the safety net for
# exit(1) / uncaught-exception paths.
_close_output_clients(clients)
if __name__ == "__main__":
_main()
+249 -1
View File
@@ -1612,7 +1612,7 @@ watch = true
self.assertEqual(cm.exception.code, 1)
# watch was still called twice (reload loop continued after failed reload)
self.assertEqual(mock_watch.call_count, 2)
# The failed reload must not have closed the original clients
# Old clients should NOT have been closed (reload failed before swap)
initial_clients["s3_client"].close.assert_not_called()
@unittest.skipUnless(
@@ -1870,6 +1870,254 @@ watch = true
)
class TestSigtermShutdown(unittest.TestCase):
"""Tests for graceful SIGTERM/SIGINT shutdown."""
def setUp(self):
from parsedmarc.log import logger as _logger
_logger.disabled = True
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
self._stdout_patch.start()
self._stderr_patch.start()
def tearDown(self):
from parsedmarc.log import logger as _logger
_logger.disabled = False
self._stderr_patch.stop()
self._stdout_patch.stop()
_BASE_CONFIG = """[general]
silent = true
[imap]
host = imap.example.com
user = user
password = pass
[mailbox]
watch = true
"""
def _write_config(self, body=None):
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
cfg.write(body if body is not None else self._BASE_CONFIG)
cfg_path = cfg.name
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
return cfg_path
@staticmethod
def _empty_reports():
return {
"aggregate_reports": [],
"failure_reports": [],
"smtp_tls_reports": [],
}
@staticmethod
def _parse_config_side_effect(config, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
opts.mailbox_watch = True
return None
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testSigtermDuringWatchExitsCleanlyAndClosesClients(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
"""SIGTERM during watch: the backend polls config_reloading,
observes the flag, and returns at a safe boundary; _main breaks
the watch loop, returns normally, and closes every output client
that exposes a `.close()`."""
mock_imap.return_value = object()
mock_load_config.return_value = ConfigParser()
mock_parse_config.side_effect = self._parse_config_side_effect
mock_get_reports.return_value = self._empty_reports()
kafka_client = MagicMock(spec=["close"])
elasticsearch_client = MagicMock(spec=["close"])
no_close_client = MagicMock(spec=[]) # no `close` attr → skipped
mock_init_clients.return_value = {
"kafka": kafka_client,
"elasticsearch": elasticsearch_client,
"syslog": no_close_client,
}
observed = []
def watch_side_effect(*args, **kwargs):
# SIGTERM lands while watching; the backend then polls
# config_reloading at its next safe boundary and returns.
os.kill(os.getpid(), signal.SIGTERM)
observed.append(kwargs["config_reloading"]())
mock_watch.side_effect = watch_side_effect
cfg_path = self._write_config()
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
parsedmarc.cli._main()
self.assertEqual(mock_watch.call_count, 1)
self.assertEqual(observed, [True])
kafka_client.close.assert_called()
elasticsearch_client.close.assert_called()
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config")
@patch("parsedmarc.cli._load_config")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testFirstSigintGracefulSecondSigintHardExits(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_load_config,
mock_parse_config,
mock_init_clients,
):
"""First SIGINT → graceful flag, second SIGINT → os._exit(130).
The installed handler is invoked directly via signal.getsignal()
because two POSIX SIGINTs sent in rapid succession from the same
process can be coalesced by the kernel (standard signals don't
queue)."""
mock_imap.return_value = object()
mock_load_config.return_value = ConfigParser()
mock_parse_config.side_effect = self._parse_config_side_effect
mock_get_reports.return_value = self._empty_reports()
mock_init_clients.return_value = {}
sentinel = SystemExit("os._exit was reached")
def fake_exit(code):
raise sentinel
def watch_side_effect(*args, **kwargs):
handler = signal.getsignal(signal.SIGINT)
# getsignal() can return SIG_DFL/SIG_IGN/None; narrow the type
# so the handler can be invoked directly.
assert callable(handler)
handler(signal.SIGINT, None) # first press: graceful flag
handler(signal.SIGINT, None) # second press: hits os._exit
mock_watch.side_effect = watch_side_effect
cfg_path = self._write_config()
with patch("parsedmarc.cli.os._exit", side_effect=fake_exit) as mock_exit:
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
with self.assertRaises(SystemExit) as cm:
parsedmarc.cli._main()
self.assertIs(cm.exception, sentinel)
mock_exit.assert_called_once_with(130)
@patch("parsedmarc.cli.get_dmarc_reports_from_mbox")
@patch("parsedmarc.cli.is_mbox", side_effect=lambda p: p.endswith(".mbox"))
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli.Process")
@patch("parsedmarc.cli.glob")
def testSigtermDuringOneShotStopsBetweenBatchesAndMbox(
self,
mock_glob,
mock_process_cls,
mock_init_clients,
mock_is_mbox,
mock_get_mbox,
):
"""SIGTERM during one-shot processing: the in-flight child is
joined normally (no work lost), the file-batch loop stops before
spawning the next batch, and the subsequent mbox loop breaks on
its first iteration (the flag is already set). Output clients are
still closed.
Two ``.xml`` files give the batch loop a second iteration to hit
its break; one ``.mbox`` file routes into ``mbox_paths`` so the
mbox break is exercised too. ``is_mbox`` is keyed by suffix so the
fake filenames don't trigger ``mailbox.mbox(path, create=True)``."""
mock_glob.return_value = ["a.xml", "b.xml", "c.mbox"]
kafka_client = MagicMock(spec=["close"])
mock_init_clients.return_value = {"kafka": kafka_client}
starts = []
class FakeProc:
"""Stand-in child that finishes its file and sends a result
even though SIGTERM arrived mid-batch."""
def __init__(self, target=None, args=()):
self._args = args
def start(self):
starts.append(self._args[0])
if len(starts) == 1:
os.kill(os.getpid(), signal.SIGTERM)
# Child still completes and reports back over the pipe.
self._args[-3].send([None, self._args[0]])
def join(self, timeout=None):
return None
mock_process_cls.side_effect = FakeProc
with patch.object(sys, "argv", ["parsedmarc", "a.xml", "b.xml", "c.mbox"]):
parsedmarc.cli._main()
# Only the first xml batch ran before the batch loop broke, and the
# mbox loop broke before processing its file.
self.assertEqual(len(starts), 1)
mock_get_mbox.assert_not_called()
kafka_client.close.assert_called()
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli.cli_parse")
@patch("parsedmarc.cli.glob")
def testNormalOneShotExitClosesOutputClients(
self,
mock_glob,
mock_cli_parse,
mock_init_clients,
):
"""A successful one-shot run with no signal still closes its
output clients regression for the long-standing leak where
_close_output_clients was only called inside the SIGHUP
reload path."""
mock_glob.return_value = []
kafka_client = MagicMock(spec=["close"])
es_client = MagicMock(spec=["close"])
mock_init_clients.return_value = {
"kafka": kafka_client,
"elasticsearch": es_client,
}
# No watch, no mailbox, no files → _main runs through with
# empty parsing_results and returns normally.
with patch.object(sys, "argv", ["parsedmarc", "nothing-here.xml"]):
try:
parsedmarc.cli._main()
except SystemExit:
pass
kafka_client.close.assert_called_once()
es_client.close.assert_called_once()
class TestIndexPrefixDomainMapTlsFiltering(unittest.TestCase):
"""Tests that SMTP TLS reports for unmapped domains are filtered out
when index_prefix_domain_map is configured."""