Compare commits

...

2 Commits

Author SHA1 Message Date
Sean Whalen
9849598100 Formatting 2026-03-21 16:17:35 -04:00
Sean Whalen
e82f3e58a1 SIGHUP-based configuration reload for watch mode (#697)
* Enhance mailbox connection watch method to support reload functionality

- Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so.
- Modified related tests to accommodate the new method signature.
- Changed logger calls from `critical` to `error` for consistency in logging severity.
- Added a new settings file for Claude with specific permissions for testing and code checks.

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#698)

* Initial plan

* Fix reload state consistency, resource leaks, stale opts; add tests

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* [WIP] SIGHUP-based configuration reload for watch mode (#699)

* Initial plan

* Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based configuration reload: address review feedback (#700)

* Initial plan

* Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Best-effort initialization for optional output clients in watch mode (#701)

* Initial plan

* Wrap optional output client init in try/except for best-effort initialization

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Fix SIGHUP reload tight-loop in watch mode (#702)

* Initial plan

* Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Update parsedmarc/cli.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fix resource leak when HEC config is invalid in `_init_output_clients()` (#703)

* Initial plan

* Fix resource leak: validate HEC settings before creating any output clients

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/38c73e09-789d-4d41-b75e-bbc61418859d

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Ensure SIGHUP never triggers a new email batch across all watch() implementations (#704)

* Initial plan

* Ensure SIGHUP never starts a new email batch in any watch() implementation

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/45d5be30-8f6b-4200-9bdd-15c655033f17

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* SIGHUP-based config reload for watch mode: address review feedback (#705)

* Initial plan

* Address review feedback: Kafka SSL context, SIGHUP handler safety, test formatting

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/8f2fd48f-32a4-4258-9a89-06f7c7ac29bf

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Reverted changes by copilot that turned errors into warnings

* Enhance usage documentation for config reload: clarify behavior on successful reload and error handling

* Update CHANGELOG.md to reflect config reload enhancements

* Add pytest command to settings for silent output during testing

* Enhance resource management: add close methods for S3Client and HECClient, and improve IMAP connection handling during IDLE. Update CHANGELOG.md for config reload improvements and bug fixes.

* Update changelog to not include fixes within the same unreleased version

* Refactor changelog entries for clarity and consistency in configuration reload section

* Fix changelog entry for msgraph configuration check

* Update CHANGELOG..md

* make single list items on one line in the changelog instead of doing hard wraps

* Remove incorrect IMAP changes

* Rename 'should_reload' parameter to 'config_reloading' in mailbox connection methods for clarity

* Restore startup configuration checks

* Improve error logging for Elasticsearch and OpenSearch exceptions

* Bump version to 9.3.0 in constants.py

* Refactor GelfClient methods to use specific report types instead of generic dicts

* Refactor tests to use assertions consistently and improve type hints

---------

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2026-03-21 16:14:48 -04:00
18 changed files with 1550 additions and 904 deletions

17
.claude/settings.json Normal file
View File

@@ -0,0 +1,17 @@
{
"permissions": {
"allow": [
"Bash(python -c \"import py_compile; py_compile.compile\\(''parsedmarc/cli.py'', doraise=True\\)\")",
"Bash(ruff check:*)",
"Bash(ruff format:*)",
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
"Bash(ls tests*)",
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
"Bash(python -m pytest tests.py --no-header -q)"
],
"additionalDirectories": [
"/tmp"
]
}
}

View File

@@ -1,10 +1,28 @@
# Changelog
## 9.3.0
### Added
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
- Use `systemctl reload parsedmarc` when running under `systemd`.
- On a successful reload, old output clients are closed and recreated.
- On a failed reload, the previous configuration remains fully active.
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
### Fixed
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
## 9.2.1
### Added
- Better checking of `msconfig` configuration (PR #695)
- Better checking of `msgraph` configuration (PR #695)
### Changed

View File

@@ -404,6 +404,7 @@ The full set of configuration options are:
retry_attempts = 3
retry_delay = 5
```
- `gmail_api`
- `credentials_file` - str: Path to file containing the
credentials, None to disable (Default: `None`)
@@ -442,7 +443,7 @@ The full set of configuration options are:
- `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR
:::{note}
Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
Information regarding the setup of the Data Collection Rule can be found [in the Azure documentation](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
:::
- `gelf`
- `host` - str: The GELF server name or IP address
@@ -602,6 +603,7 @@ After=network.target network-online.target elasticsearch.service
[Service]
ExecStart=/opt/parsedmarc/venv/bin/parsedmarc -c /etc/parsedmarc.ini
ExecReload=/bin/kill -HUP $MAINPID
User=parsedmarc
Group=parsedmarc
Restart=always
@@ -634,6 +636,51 @@ sudo service parsedmarc restart
:::
### Reloading configuration without restarting
When running in watch mode, `parsedmarc` supports reloading its
configuration file without restarting the service or interrupting
report processing that is already in progress. Send a `SIGHUP` signal
to the process, or use `systemctl reload` if the unit file includes
the `ExecReload` line shown above:
```bash
sudo systemctl reload parsedmarc
```
The reload takes effect after the current batch of reports finishes
processing and all output operations (Elasticsearch, Kafka, S3, etc.)
for that batch have completed. The following settings are reloaded:
- All output destinations (Elasticsearch, OpenSearch, Kafka, S3,
Splunk, syslog, GELF, webhooks, Log Analytics)
- Multi-tenant index prefix domain map (`index_prefix_domain_map` —
the referenced YAML file is re-read on reload)
- DNS and GeoIP settings (`nameservers`, `dns_timeout`, `ip_db_path`,
`offline`, etc.)
- Processing flags (`strip_attachment_payloads`, `batch_size`,
`check_timeout`, etc.)
- Log level (`debug`, `verbose`, `warnings`, `silent`)
Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
Gmail API, Maildir path) are **not** reloaded — changing those still
requires a full restart.
On a **successful** reload, existing output client connections are
closed and new ones are created from the updated configuration. The
service then resumes watching with the new settings.
If the new configuration file contains errors (missing required
settings, unreachable output destinations, etc.), the **entire reload
is aborted** — no output clients are replaced and the previous
configuration remains fully active. This means a typo in one section
will not take down an otherwise working setup. Check the logs for
details:
```bash
journalctl -u parsedmarc.service -r
```
To check the status of the service, run:
```bash

View File

@@ -2195,6 +2195,7 @@ def watch_inbox(
batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24,
config_reloading: Optional[Callable] = None,
):
"""
Watches the mailbox for new messages and
@@ -2222,6 +2223,8 @@ def watch_inbox(
batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
config_reloading: Optional callable that returns True when a config
reload has been requested (e.g. via SIGHUP)
"""
def check_callback(connection):
@@ -2246,7 +2249,14 @@ def watch_inbox(
)
callback(res)
mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout)
watch_kwargs: dict = {
"check_callback": check_callback,
"check_timeout": check_timeout,
}
if config_reloading is not None:
watch_kwargs["config_reloading"] = config_reloading
mailbox_connection.watch(**watch_kwargs)
def append_json(

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,3 @@
__version__ = "9.2.1"
__version__ = "9.3.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -3,9 +3,7 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
@@ -14,6 +12,7 @@ from parsedmarc import (
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
log_context_data = threading.local()
@@ -37,7 +36,7 @@ class GelfClient(object):
"""
self.host = host
self.port = port
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger = logging.getLogger("parsedmarc_gelf")
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
@@ -50,7 +49,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -58,14 +57,19 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc smtptls report")
def close(self):
"""Remove and close the GELF handler, releasing its connection."""
self.logger.removeHandler(self.handler)
self.handler.close()

View File

@@ -62,6 +62,10 @@ class KafkaClient(object):
except NoBrokersAvailable:
raise KafkaError("No Kafka brokers available")
def close(self):
"""Close the Kafka producer, releasing background threads and sockets."""
self.producer.close()
@staticmethod
def strip_metadata(report: dict[str, Any]):
"""

View File

@@ -175,10 +175,14 @@ class GmailConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)

View File

@@ -278,10 +278,14 @@ class MSGraphConnection(MailboxConnection):
# Not needed
pass
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""Checks the mailbox for new messages every n seconds"""
while True:
if config_reloading and config_reloading():
return
sleep(check_timeout)
if config_reloading and config_reloading():
return
check_callback(self)
@lru_cache(maxsize=10)

View File

@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
"""
Use an IDLE IMAP connection to parse incoming emails,
and pass the results to a callback function
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
check_callback(self)
while True:
if config_reloading and config_reloading():
return
try:
IMAPClient(
host=self._client.host,
@@ -111,3 +113,5 @@ class IMAPConnection(MailboxConnection):
except Exception as e:
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
sleep(check_timeout)
if config_reloading and config_reloading():
return

View File

@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
raise NotImplementedError

View File

@@ -63,10 +63,14 @@ class MaildirConnection(MailboxConnection):
def keepalive(self):
return
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
while True:
if config_reloading and config_reloading():
return
try:
check_callback(self)
except Exception as e:
logger.warning("Maildir init error. {0}".format(e))
if config_reloading and config_reloading():
return
sleep(check_timeout)

View File

@@ -93,3 +93,11 @@ class S3Client(object):
self.bucket.put_object(
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
)
def close(self):
"""Clean up the boto3 resource."""
try:
if self.s3.meta is not None:
self.s3.meta.client.close()
except Exception:
pass

View File

@@ -207,3 +207,7 @@ class HECClient(object):
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def close(self):
"""Close the underlying HTTP session."""
self.session.close()

View File

@@ -57,7 +57,7 @@ class SyslogClient(object):
self.logger.setLevel(logging.INFO)
# Create the appropriate syslog handler based on protocol
log_handler = self._create_syslog_handler(
self.log_handler = self._create_syslog_handler(
server_name,
server_port,
self.protocol,
@@ -69,7 +69,7 @@ class SyslogClient(object):
retry_delay,
)
self.logger.addHandler(log_handler)
self.logger.addHandler(self.log_handler)
def _create_syslog_handler(
self,
@@ -179,3 +179,8 @@ class SyslogClient(object):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))
def close(self):
"""Remove and close the syslog handler, releasing its socket."""
self.logger.removeHandler(self.log_handler)
self.log_handler.close()

View File

@@ -63,3 +63,7 @@ class WebhookClient(object):
self.session.post(webhook_url, data=payload, timeout=self.timeout)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
def close(self):
"""Close the underlying HTTP session."""
self.session.close()

374
tests.py
View File

@@ -4,6 +4,7 @@
from __future__ import absolute_import, print_function, unicode_literals
import os
import signal
import sys
import tempfile
import unittest
@@ -11,10 +12,11 @@ from base64 import urlsafe_b64encode
from glob import glob
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import cast
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from lxml import etree
from lxml import etree # type: ignore[import-untyped]
from googleapiclient.errors import HttpError
from httplib2 import Response
from imapclient.exceptions import IMAPClientError
@@ -31,6 +33,7 @@ from parsedmarc.mail.imap import IMAPConnection
import parsedmarc.mail.gmail as gmail_module
import parsedmarc.mail.graph as graph_module
import parsedmarc.mail.imap as imap_module
import parsedmarc.elastic
import parsedmarc.opensearch as opensearch_module
import parsedmarc.utils
@@ -153,7 +156,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
self.assertEqual(result["report_type"], "aggregate")
assert result["report_type"] == "aggregate"
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
def testParseReportFileAcceptsPathForEmail(self):
@@ -164,7 +167,7 @@ class Test(unittest.TestCase):
report_path,
offline=True,
)
self.assertEqual(result["report_type"], "aggregate")
assert result["report_type"] == "aggregate"
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
def testAggregateSamples(self):
@@ -175,10 +178,11 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
result = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
)
assert result["report_type"] == "aggregate"
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
print("Passed!")
def testEmptySample(self):
@@ -194,13 +198,13 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file:
sample_content = sample_file.read()
parsed_report = parsedmarc.parse_report_email(
email_result = parsedmarc.parse_report_email(
sample_content, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
)
assert email_result["report_type"] == "forensic"
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "forensic"
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
print("Passed!")
def testSmtpTlsSamples(self):
@@ -211,10 +215,9 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
assert result["report_type"] == "smtp_tls"
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
print("Passed!")
def testOpenSearchSigV4RequiresRegion(self):
@@ -1277,7 +1280,7 @@ class TestMailboxWatchSince(unittest.TestCase):
def testWatchInboxPassesSinceToMailboxFetch(self):
mailbox_connection = SimpleNamespace()
def fake_watch(check_callback, check_timeout):
def fake_watch(check_callback, check_timeout, config_reloading=None):
check_callback(mailbox_connection)
raise _BreakLoop()
@@ -1288,7 +1291,9 @@ class TestMailboxWatchSince(unittest.TestCase):
) as mocked:
with self.assertRaises(_BreakLoop):
parsedmarc.watch_inbox(
mailbox_connection=mailbox_connection,
mailbox_connection=cast(
parsedmarc.MailboxConnection, mailbox_connection
),
callback=callback,
check_timeout=1,
batch_size=10,
@@ -1336,30 +1341,30 @@ since = 2d
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
class _DummyMailboxConnection:
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
def __init__(self):
self.fetch_calls = []
self.fetch_calls: list[dict[str, object]] = []
def create_folder(self, folder_name):
def create_folder(self, folder_name: str):
return None
def fetch_messages(self, reports_folder, **kwargs):
def fetch_messages(self, reports_folder: str, **kwargs):
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
return []
def fetch_message(self, message_id, **kwargs):
def fetch_message(self, message_id) -> str:
return ""
def delete_message(self, message_id):
return None
def move_message(self, message_id, folder_name):
def move_message(self, message_id, folder_name: str):
return None
def keepalive(self):
return None
def watch(self, check_callback, check_timeout):
def watch(self, check_callback, check_timeout, config_reloading=None):
return None
@@ -1558,7 +1563,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testWellKnownFolderFallback(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient()
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1578,7 +1583,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
def testUnknownFolderStillFails(self):
connection = MSGraphConnection.__new__(MSGraphConnection)
connection.mailbox_name = "shared@example.com"
connection._client = _FakeGraphClient()
connection._client = _FakeGraphClient() # type: ignore[assignment]
connection._request_with_retries = MagicMock(
side_effect=lambda method_name, *args, **kwargs: getattr(
connection._client, method_name
@@ -1910,5 +1915,320 @@ certificate_path = /tmp/msgraph-cert.pem
mock_get_mailbox_reports.assert_not_called()
class TestSighupReload(unittest.TestCase):
"""Tests for SIGHUP-driven configuration reload in watch mode."""
_BASE_CONFIG = """[general]
silent = true
[imap]
host = imap.example.com
user = user
password = pass
[mailbox]
watch = true
"""
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testSighupTriggersReloadAndWatchRestarts(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_parse_config,
mock_init_clients,
):
"""SIGHUP causes watch to return, config is re-parsed, and watch restarts."""
import signal as signal_module
mock_imap.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
def parse_side_effect(config_file, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
opts.mailbox_watch = True
return None
mock_parse_config.side_effect = parse_side_effect
mock_init_clients.return_value = {}
call_count = [0]
def watch_side_effect(*args, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
# Simulate SIGHUP arriving while watch is running
if hasattr(signal_module, "SIGHUP"):
import os
os.kill(os.getpid(), signal_module.SIGHUP)
return # Normal return — reload loop will continue
else:
raise FileExistsError("stop-watch-loop")
mock_watch.side_effect = watch_side_effect
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
cfg.write(self._BASE_CONFIG)
cfg_path = cfg.name
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
with self.assertRaises(SystemExit) as cm:
parsedmarc.cli._main()
# Exited with code 1 (from FileExistsError handler)
self.assertEqual(cm.exception.code, 1)
# watch_inbox was called twice: initial run + after reload
self.assertEqual(mock_watch.call_count, 2)
# _parse_config_file called for initial load + reload
self.assertGreaterEqual(mock_parse_config.call_count, 2)
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testInvalidConfigOnReloadKeepsPreviousState(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_parse_config,
mock_init_clients,
):
"""A failing reload leaves opts and clients unchanged."""
import signal as signal_module
mock_imap.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
# Initial parse sets required opts; reload parse raises
initial_map = {"prefix_": ["example.com"]}
call_count = [0]
def parse_side_effect(config_file, opts):
call_count[0] += 1
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
opts.mailbox_watch = True
if call_count[0] == 1:
return initial_map
raise RuntimeError("bad config")
mock_parse_config.side_effect = parse_side_effect
initial_clients = {"s3_client": MagicMock()}
mock_init_clients.return_value = initial_clients
watch_calls = [0]
def watch_side_effect(*args, **kwargs):
watch_calls[0] += 1
if watch_calls[0] == 1:
if hasattr(signal_module, "SIGHUP"):
import os
os.kill(os.getpid(), signal_module.SIGHUP)
return
else:
raise FileExistsError("stop")
mock_watch.side_effect = watch_side_effect
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
cfg.write(self._BASE_CONFIG)
cfg_path = cfg.name
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
with self.assertRaises(SystemExit) as cm:
parsedmarc.cli._main()
self.assertEqual(cm.exception.code, 1)
# watch was still called twice (reload loop continued after failed reload)
self.assertEqual(mock_watch.call_count, 2)
# The failed reload must not have closed the original clients
initial_clients["s3_client"].close.assert_not_called()
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli._parse_config_file")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testReloadClosesOldClients(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_parse_config,
mock_init_clients,
):
"""Successful reload closes the old output clients before replacing them."""
import signal as signal_module
mock_imap.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
def parse_side_effect(config_file, opts):
opts.imap_host = "imap.example.com"
opts.imap_user = "user"
opts.imap_password = "pass"
opts.mailbox_watch = True
return None
mock_parse_config.side_effect = parse_side_effect
old_client = MagicMock()
new_client = MagicMock()
init_call = [0]
def init_side_effect(opts):
init_call[0] += 1
if init_call[0] == 1:
return {"kafka_client": old_client}
return {"kafka_client": new_client}
mock_init_clients.side_effect = init_side_effect
watch_calls = [0]
def watch_side_effect(*args, **kwargs):
watch_calls[0] += 1
if watch_calls[0] == 1:
if hasattr(signal_module, "SIGHUP"):
import os
os.kill(os.getpid(), signal_module.SIGHUP)
return
else:
raise FileExistsError("stop")
mock_watch.side_effect = watch_side_effect
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
cfg.write(self._BASE_CONFIG)
cfg_path = cfg.name
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
with self.assertRaises(SystemExit):
parsedmarc.cli._main()
# Old client must have been closed when reload succeeded
old_client.close.assert_called_once()
@unittest.skipUnless(
hasattr(signal, "SIGHUP"),
"SIGHUP not available on this platform",
)
@patch("parsedmarc.cli._init_output_clients")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.watch_inbox")
@patch("parsedmarc.cli.IMAPConnection")
def testRemovedConfigSectionTakesEffectOnReload(
self,
mock_imap,
mock_watch,
mock_get_reports,
mock_init_clients,
):
"""Removing a config section on reload resets that option to its default."""
import signal as signal_module
mock_imap.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
mock_init_clients.return_value = {}
# First config sets kafka_hosts (with required topics); second removes it.
config_v1 = (
self._BASE_CONFIG
+ "\n[kafka]\nhosts = kafka.example.com:9092\n"
+ "aggregate_topic = dmarc_agg\n"
+ "forensic_topic = dmarc_forensic\n"
+ "smtp_tls_topic = smtp_tls\n"
)
config_v2 = self._BASE_CONFIG # no [kafka] section
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
cfg.write(config_v1)
cfg_path = cfg.name
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
watch_calls = [0]
def watch_side_effect(*args, **kwargs):
watch_calls[0] += 1
if watch_calls[0] == 1:
# Rewrite config to remove kafka before triggering reload
with open(cfg_path, "w") as f:
f.write(config_v2)
if hasattr(signal_module, "SIGHUP"):
import os
os.kill(os.getpid(), signal_module.SIGHUP)
return
else:
raise FileExistsError("stop")
mock_watch.side_effect = watch_side_effect
# Capture opts used on each _init_output_clients call
init_opts_captures = []
def init_side_effect(opts):
from argparse import Namespace as NS
init_opts_captures.append(NS(**vars(opts)))
return {}
mock_init_clients.side_effect = init_side_effect
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
with self.assertRaises(SystemExit):
parsedmarc.cli._main()
# First init: kafka_hosts should be set from v1 config
self.assertIsNotNone(init_opts_captures[0].kafka_hosts)
# Second init (after reload with v2 config): kafka_hosts should be None
self.assertIsNone(init_opts_captures[1].kafka_hosts)
if __name__ == "__main__":
unittest.main(verbosity=2)