mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-21 22:12:45 +00:00
Compare commits
2 Commits
copilot/su
...
9.3.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9849598100 | ||
|
|
e82f3e58a1 |
@@ -7,7 +7,8 @@
|
||||
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
|
||||
"Bash(ls tests*)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
|
||||
"Bash(python -m pytest tests.py --no-header -q)"
|
||||
],
|
||||
"additionalDirectories": [
|
||||
"/tmp"
|
||||
|
||||
23
CHANGELOG.md
23
CHANGELOG.md
@@ -4,20 +4,25 @@
|
||||
|
||||
### Added
|
||||
|
||||
- SIGHUP-based configuration reload for watch mode — update output
|
||||
destinations, DNS/GeoIP settings, processing flags, and log level
|
||||
without restarting the service or interrupting in-progress report
|
||||
processing. Use `systemctl reload parsedmarc` when running under
|
||||
systemd.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from
|
||||
`_main()` in `cli.py` to support config reload and reduce code
|
||||
duplication.
|
||||
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
|
||||
- Use `systemctl reload parsedmarc` when running under `systemd`.
|
||||
- On a successful reload, old output clients are closed and recreated.
|
||||
- On a failed reload, the previous configuration remains fully active.
|
||||
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
|
||||
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
|
||||
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
|
||||
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
|
||||
|
||||
## 9.2.1
|
||||
|
||||
### Added
|
||||
|
||||
- Better checking of `msconfig` configuration (PR #695)
|
||||
- Better checking of `msgraph` configuration (PR #695)
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -666,8 +666,15 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
|
||||
Gmail API, Maildir path) are **not** reloaded — changing those still
|
||||
requires a full restart.
|
||||
|
||||
If the new configuration file contains errors, the reload is aborted
|
||||
and the previous configuration remains active. Check the logs for
|
||||
On a **successful** reload, existing output client connections are
|
||||
closed and new ones are created from the updated configuration. The
|
||||
service then resumes watching with the new settings.
|
||||
|
||||
If the new configuration file contains errors (missing required
|
||||
settings, unreachable output destinations, etc.), the **entire reload
|
||||
is aborted** — no output clients are replaced and the previous
|
||||
configuration remains fully active. This means a typo in one section
|
||||
will not take down an otherwise working setup. Check the logs for
|
||||
details:
|
||||
|
||||
```bash
|
||||
|
||||
@@ -2195,7 +2195,7 @@ def watch_inbox(
|
||||
batch_size: int = 10,
|
||||
since: Optional[Union[datetime, date, str]] = None,
|
||||
normalize_timespan_threshold_hours: float = 24,
|
||||
should_reload: Optional[Callable] = None,
|
||||
config_reloading: Optional[Callable] = None,
|
||||
):
|
||||
"""
|
||||
Watches the mailbox for new messages and
|
||||
@@ -2223,7 +2223,7 @@ def watch_inbox(
|
||||
batch_size (int): Number of messages to read and process before saving
|
||||
since: Search for messages since certain time
|
||||
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
|
||||
should_reload: Optional callable that returns True when a config
|
||||
config_reloading: Optional callable that returns True when a config
|
||||
reload has been requested (e.g. via SIGHUP)
|
||||
"""
|
||||
|
||||
@@ -2253,8 +2253,8 @@ def watch_inbox(
|
||||
"check_callback": check_callback,
|
||||
"check_timeout": check_timeout,
|
||||
}
|
||||
if should_reload is not None:
|
||||
watch_kwargs["should_reload"] = should_reload
|
||||
if config_reloading is not None:
|
||||
watch_kwargs["config_reloading"] = config_reloading
|
||||
|
||||
mailbox_connection.watch(**watch_kwargs)
|
||||
|
||||
|
||||
@@ -201,20 +201,8 @@ def _parse_config_file(config_file, opts):
|
||||
"normalize_timespan_threshold_hours"
|
||||
)
|
||||
if "index_prefix_domain_map" in general_config:
|
||||
map_path = general_config["index_prefix_domain_map"]
|
||||
try:
|
||||
with open(map_path) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
except OSError as exc:
|
||||
raise ConfigurationError(
|
||||
"Failed to read index_prefix_domain_map file "
|
||||
"'{0}': {1}".format(map_path, exc)
|
||||
) from exc
|
||||
except yaml.YAMLError as exc:
|
||||
raise ConfigurationError(
|
||||
"Failed to parse YAML in index_prefix_domain_map "
|
||||
"file '{0}': {1}".format(map_path, exc)
|
||||
) from exc
|
||||
with open(general_config["index_prefix_domain_map"]) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
if "offline" in general_config:
|
||||
opts.offline = bool(general_config.getboolean("offline"))
|
||||
if "strip_attachment_payloads" in general_config:
|
||||
@@ -254,7 +242,7 @@ def _parse_config_file(config_file, opts):
|
||||
except Exception as ns_error:
|
||||
raise ConfigurationError(
|
||||
"DNS pre-flight check failed: {}".format(ns_error)
|
||||
)
|
||||
) from ns_error
|
||||
if not dummy_hostname:
|
||||
raise ConfigurationError(
|
||||
"DNS pre-flight check failed: no PTR record for {} from {}".format(
|
||||
@@ -271,8 +259,6 @@ def _parse_config_file(config_file, opts):
|
||||
opts.debug = bool(general_config.getboolean("debug"))
|
||||
if "verbose" in general_config:
|
||||
opts.verbose = bool(general_config.getboolean("verbose"))
|
||||
if "silent" in general_config:
|
||||
opts.silent = bool(general_config.getboolean("silent"))
|
||||
if "warnings" in general_config:
|
||||
opts.warnings = bool(general_config.getboolean("warnings"))
|
||||
if "fail_on_output_error" in general_config:
|
||||
@@ -761,15 +747,15 @@ def _parse_config_file(config_file, opts):
|
||||
if "oauth2_port" in gmail_api_config:
|
||||
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
|
||||
if "auth_mode" in gmail_api_config:
|
||||
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
|
||||
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
|
||||
if "service_account_user" in gmail_api_config:
|
||||
opts.gmail_api_service_account_user = gmail_api_config.get(
|
||||
opts.gmail_api_service_account_user = gmail_api_config[
|
||||
"service_account_user"
|
||||
).strip()
|
||||
].strip()
|
||||
elif "delegated_user" in gmail_api_config:
|
||||
opts.gmail_api_service_account_user = gmail_api_config.get(
|
||||
opts.gmail_api_service_account_user = gmail_api_config[
|
||||
"delegated_user"
|
||||
).strip()
|
||||
].strip()
|
||||
|
||||
if "maildir" in config.sections():
|
||||
maildir_api_config = config["maildir"]
|
||||
@@ -824,6 +810,38 @@ def _parse_config_file(config_file, opts):
|
||||
return index_prefix_domain_map
|
||||
|
||||
|
||||
class _ElasticsearchHandle:
|
||||
"""Sentinel so Elasticsearch participates in _close_output_clients."""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
conn = elastic.connections.get_connection()
|
||||
if not isinstance(conn, str):
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
elastic.connections.remove_connection("default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class _OpenSearchHandle:
|
||||
"""Sentinel so OpenSearch participates in _close_output_clients."""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
conn = opensearch.connections.get_connection()
|
||||
if not isinstance(conn, str):
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
opensearch.connections.remove_connection("default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _init_output_clients(opts):
|
||||
"""Create output clients based on current opts.
|
||||
|
||||
@@ -835,76 +853,6 @@ def _init_output_clients(opts):
|
||||
"""
|
||||
clients = {}
|
||||
|
||||
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
||||
if opts.elasticsearch_hosts:
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
es_forensic_index = "dmarc_forensic"
|
||||
es_smtp_tls_index = "smtp_tls"
|
||||
if opts.elasticsearch_index_suffix:
|
||||
suffix = opts.elasticsearch_index_suffix
|
||||
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
|
||||
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
|
||||
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
|
||||
if opts.elasticsearch_index_prefix:
|
||||
prefix = opts.elasticsearch_index_prefix
|
||||
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
|
||||
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
|
||||
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
|
||||
elastic_timeout_value = (
|
||||
float(opts.elasticsearch_timeout)
|
||||
if opts.elasticsearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
elastic.set_hosts(
|
||||
opts.elasticsearch_hosts,
|
||||
use_ssl=opts.elasticsearch_ssl,
|
||||
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
|
||||
username=opts.elasticsearch_username,
|
||||
password=opts.elasticsearch_password,
|
||||
api_key=opts.elasticsearch_api_key,
|
||||
timeout=elastic_timeout_value,
|
||||
)
|
||||
elastic.migrate_indexes(
|
||||
aggregate_indexes=[es_aggregate_index],
|
||||
forensic_indexes=[es_forensic_index],
|
||||
)
|
||||
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
os_forensic_index = "dmarc_forensic"
|
||||
os_smtp_tls_index = "smtp_tls"
|
||||
if opts.opensearch_index_suffix:
|
||||
suffix = opts.opensearch_index_suffix
|
||||
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
|
||||
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
|
||||
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
|
||||
if opts.opensearch_index_prefix:
|
||||
prefix = opts.opensearch_index_prefix
|
||||
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
|
||||
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
|
||||
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
|
||||
opensearch_timeout_value = (
|
||||
float(opts.opensearch_timeout)
|
||||
if opts.opensearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
opensearch.set_hosts(
|
||||
opts.opensearch_hosts,
|
||||
use_ssl=opts.opensearch_ssl,
|
||||
ssl_cert_path=opts.opensearch_ssl_cert_path,
|
||||
username=opts.opensearch_username,
|
||||
password=opts.opensearch_password,
|
||||
api_key=opts.opensearch_api_key,
|
||||
timeout=opensearch_timeout_value,
|
||||
auth_type=opts.opensearch_auth_type,
|
||||
aws_region=opts.opensearch_aws_region,
|
||||
aws_service=opts.opensearch_aws_service,
|
||||
)
|
||||
opensearch.migrate_indexes(
|
||||
aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index],
|
||||
)
|
||||
|
||||
if opts.s3_bucket:
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
@@ -977,6 +925,83 @@ def _init_output_clients(opts):
|
||||
timeout=opts.webhook_timeout,
|
||||
)
|
||||
|
||||
# Elasticsearch and OpenSearch mutate module-level global state via
|
||||
# connections.create_connection(), which cannot be rolled back if a later
|
||||
# step fails. Initialise them last so that all other clients are created
|
||||
# successfully first; this minimises the window for partial-init problems
|
||||
# during config reload.
|
||||
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
||||
if opts.elasticsearch_hosts:
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
es_forensic_index = "dmarc_forensic"
|
||||
es_smtp_tls_index = "smtp_tls"
|
||||
if opts.elasticsearch_index_suffix:
|
||||
suffix = opts.elasticsearch_index_suffix
|
||||
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
|
||||
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
|
||||
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
|
||||
if opts.elasticsearch_index_prefix:
|
||||
prefix = opts.elasticsearch_index_prefix
|
||||
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
|
||||
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
|
||||
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
|
||||
elastic_timeout_value = (
|
||||
float(opts.elasticsearch_timeout)
|
||||
if opts.elasticsearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
elastic.set_hosts(
|
||||
opts.elasticsearch_hosts,
|
||||
use_ssl=opts.elasticsearch_ssl,
|
||||
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
|
||||
username=opts.elasticsearch_username,
|
||||
password=opts.elasticsearch_password,
|
||||
api_key=opts.elasticsearch_api_key,
|
||||
timeout=elastic_timeout_value,
|
||||
)
|
||||
elastic.migrate_indexes(
|
||||
aggregate_indexes=[es_aggregate_index],
|
||||
forensic_indexes=[es_forensic_index],
|
||||
)
|
||||
clients["elasticsearch"] = _ElasticsearchHandle()
|
||||
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
os_forensic_index = "dmarc_forensic"
|
||||
os_smtp_tls_index = "smtp_tls"
|
||||
if opts.opensearch_index_suffix:
|
||||
suffix = opts.opensearch_index_suffix
|
||||
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
|
||||
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
|
||||
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
|
||||
if opts.opensearch_index_prefix:
|
||||
prefix = opts.opensearch_index_prefix
|
||||
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
|
||||
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
|
||||
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
|
||||
opensearch_timeout_value = (
|
||||
float(opts.opensearch_timeout)
|
||||
if opts.opensearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
opensearch.set_hosts(
|
||||
opts.opensearch_hosts,
|
||||
use_ssl=opts.opensearch_ssl,
|
||||
ssl_cert_path=opts.opensearch_ssl_cert_path,
|
||||
username=opts.opensearch_username,
|
||||
password=opts.opensearch_password,
|
||||
api_key=opts.opensearch_api_key,
|
||||
timeout=opensearch_timeout_value,
|
||||
auth_type=opts.opensearch_auth_type,
|
||||
aws_region=opts.opensearch_aws_region,
|
||||
aws_service=opts.opensearch_aws_service,
|
||||
)
|
||||
opensearch.migrate_indexes(
|
||||
aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index],
|
||||
)
|
||||
clients["opensearch"] = _OpenSearchHandle()
|
||||
|
||||
return clients
|
||||
|
||||
|
||||
@@ -1007,7 +1032,7 @@ def _main():
|
||||
if "policy_published" in report:
|
||||
domain = report["policy_published"]["domain"]
|
||||
elif "reported_domain" in report:
|
||||
domain = report("reported_domain")
|
||||
domain = report["reported_domain"]
|
||||
elif "policies" in report:
|
||||
domain = report["policies"][0]["domain"]
|
||||
if domain:
|
||||
@@ -1602,7 +1627,7 @@ def _main():
|
||||
try:
|
||||
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
|
||||
except ConfigurationError as e:
|
||||
logger.error(str(e))
|
||||
logger.critical(str(e))
|
||||
exit(-1)
|
||||
|
||||
logger.setLevel(logging.ERROR)
|
||||
@@ -1624,6 +1649,8 @@ def _main():
|
||||
except Exception as error:
|
||||
logger.warning("Unable to write to log file: {}".format(error))
|
||||
|
||||
opts.active_log_file = opts.log_file
|
||||
|
||||
if (
|
||||
opts.imap_host is None
|
||||
and opts.graph_client_id is None
|
||||
@@ -1639,14 +1666,14 @@ def _main():
|
||||
# Initialize output clients
|
||||
try:
|
||||
clients = _init_output_clients(opts)
|
||||
except elastic.ElasticsearchError:
|
||||
logger.exception("Elasticsearch Error")
|
||||
except elastic.ElasticsearchError as e:
|
||||
logger.exception("Elasticsearch Error: {0}".format(e))
|
||||
exit(1)
|
||||
except opensearch.OpenSearchError:
|
||||
logger.exception("OpenSearch Error")
|
||||
except opensearch.OpenSearchError as e:
|
||||
logger.exception("OpenSearch Error: {0}".format(e))
|
||||
exit(1)
|
||||
except ConfigurationError as e:
|
||||
logger.error(str(e))
|
||||
logger.critical(str(e))
|
||||
exit(1)
|
||||
except Exception as error_:
|
||||
logger.error("Output client error: {0}".format(error_))
|
||||
@@ -1782,8 +1809,9 @@ def _main():
|
||||
try:
|
||||
if opts.imap_user is None or opts.imap_password is None:
|
||||
logger.error(
|
||||
"IMAP user and password must be specified ifhost is specified"
|
||||
"IMAP user and password must be specified if host is specified"
|
||||
)
|
||||
exit(1)
|
||||
|
||||
ssl = True
|
||||
verify = True
|
||||
@@ -1960,8 +1988,9 @@ def _main():
|
||||
|
||||
def _handle_sighup(signum, frame):
|
||||
nonlocal _reload_requested
|
||||
# Logging is not async-signal-safe; only set the flag here.
|
||||
# The log message is emitted from the main loop when the flag is read.
|
||||
_reload_requested = True
|
||||
logger.info("SIGHUP received, config will reload after current batch")
|
||||
|
||||
if hasattr(signal, "SIGHUP"):
|
||||
signal.signal(signal.SIGHUP, _handle_sighup)
|
||||
@@ -1970,7 +1999,12 @@ def _main():
|
||||
logger.info("Watching for email - Quit with ctrl-c")
|
||||
|
||||
while True:
|
||||
_reload_requested = False
|
||||
# Re-check mailbox_watch in case a config reload disabled watch mode
|
||||
if not opts.mailbox_watch:
|
||||
logger.info(
|
||||
"Mailbox watch disabled in reloaded configuration, stopping watcher"
|
||||
)
|
||||
break
|
||||
try:
|
||||
watch_inbox(
|
||||
mailbox_connection=mailbox_connection,
|
||||
@@ -1991,7 +2025,7 @@ def _main():
|
||||
reverse_dns_map_url=opts.reverse_dns_map_url,
|
||||
offline=opts.offline,
|
||||
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
|
||||
should_reload=lambda: _reload_requested,
|
||||
config_reloading=lambda: _reload_requested,
|
||||
)
|
||||
except FileExistsError as error:
|
||||
logger.error("{0}".format(error.__str__()))
|
||||
@@ -2003,7 +2037,12 @@ def _main():
|
||||
if not _reload_requested:
|
||||
break
|
||||
|
||||
# Reload configuration
|
||||
# Reload configuration — emit the log message here (not in the
|
||||
# signal handler, which is not async-signal-safe), then clear the
|
||||
# flag so that any new SIGHUP arriving while we reload will be
|
||||
# captured for the next iteration rather than being silently dropped.
|
||||
logger.info("SIGHUP received, config will reload after current batch")
|
||||
_reload_requested = False
|
||||
logger.info("Reloading configuration...")
|
||||
try:
|
||||
# Build a fresh opts starting from CLI-only defaults so that
|
||||
@@ -2047,6 +2086,31 @@ def _main():
|
||||
if opts.debug:
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
# Refresh FileHandler if log_file changed
|
||||
old_log_file = getattr(opts, "active_log_file", None)
|
||||
new_log_file = opts.log_file
|
||||
if old_log_file != new_log_file:
|
||||
# Remove old FileHandlers
|
||||
for h in list(logger.handlers):
|
||||
if isinstance(h, logging.FileHandler):
|
||||
h.close()
|
||||
logger.removeHandler(h)
|
||||
# Add new FileHandler if configured
|
||||
if new_log_file:
|
||||
try:
|
||||
fh = logging.FileHandler(new_log_file, "a")
|
||||
file_formatter = logging.Formatter(
|
||||
"%(asctime)s - %(levelname)s"
|
||||
" - [%(filename)s:%(lineno)d] - %(message)s"
|
||||
)
|
||||
fh.setFormatter(file_formatter)
|
||||
logger.addHandler(fh)
|
||||
except Exception as log_error:
|
||||
logger.warning(
|
||||
"Unable to write to log file: {}".format(log_error)
|
||||
)
|
||||
opts.active_log_file = new_log_file
|
||||
|
||||
logger.info("Configuration reloaded successfully")
|
||||
except Exception:
|
||||
logger.exception(
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.2.1"
|
||||
__version__ = "9.3.0"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -3,9 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
|
||||
|
||||
@@ -14,6 +12,7 @@ from parsedmarc import (
|
||||
parsed_forensic_reports_to_csv_rows,
|
||||
parsed_smtp_tls_reports_to_csv_rows,
|
||||
)
|
||||
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
|
||||
|
||||
log_context_data = threading.local()
|
||||
|
||||
@@ -37,7 +36,7 @@ class GelfClient(object):
|
||||
"""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.logger = logging.getLogger("parsedmarc_syslog")
|
||||
self.logger = logging.getLogger("parsedmarc_gelf")
|
||||
self.logger.setLevel(logging.INFO)
|
||||
self.logger.addFilter(ContextFilter())
|
||||
self.gelf_mode = {
|
||||
@@ -50,7 +49,7 @@ class GelfClient(object):
|
||||
)
|
||||
self.logger.addHandler(self.handler)
|
||||
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
|
||||
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
@@ -58,13 +57,13 @@ class GelfClient(object):
|
||||
|
||||
log_context_data.parsedmarc = None
|
||||
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
|
||||
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
self.logger.info("parsedmarc forensic report")
|
||||
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
|
||||
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
|
||||
@@ -175,13 +175,15 @@ class GmailConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
sleep(check_timeout)
|
||||
check_callback(self)
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
def _find_label_id_for_label(self, label_name: str) -> str:
|
||||
|
||||
@@ -278,13 +278,15 @@ class MSGraphConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
sleep(check_timeout)
|
||||
check_callback(self)
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
def _find_folder_id_from_folder_path(self, folder_name: str) -> str:
|
||||
|
||||
@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
self._client.noop()
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""
|
||||
Use an IDLE IMAP connection to parse incoming emails,
|
||||
and pass the results to a callback function
|
||||
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
|
||||
check_callback(self)
|
||||
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
IMAPClient(
|
||||
host=self._client.host,
|
||||
@@ -111,5 +113,5 @@ class IMAPConnection(MailboxConnection):
|
||||
except Exception as e:
|
||||
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
|
||||
sleep(check_timeout)
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
|
||||
@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
|
||||
def keepalive(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -63,12 +63,14 @@ class MaildirConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
return
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
check_callback(self)
|
||||
except Exception as e:
|
||||
logger.warning("Maildir init error. {0}".format(e))
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
|
||||
@@ -93,3 +93,11 @@ class S3Client(object):
|
||||
self.bucket.put_object(
|
||||
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Clean up the boto3 resource."""
|
||||
try:
|
||||
if self.s3.meta is not None:
|
||||
self.s3.meta.client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -207,3 +207,7 @@ class HECClient(object):
|
||||
raise SplunkError(e.__str__())
|
||||
if response["code"] != 0:
|
||||
raise SplunkError(response["text"])
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying HTTP session."""
|
||||
self.session.close()
|
||||
|
||||
96
tests.py
96
tests.py
@@ -12,10 +12,11 @@ from base64 import urlsafe_b64encode
|
||||
from glob import glob
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
from typing import cast
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from lxml import etree
|
||||
from lxml import etree # type: ignore[import-untyped]
|
||||
from googleapiclient.errors import HttpError
|
||||
from httplib2 import Response
|
||||
from imapclient.exceptions import IMAPClientError
|
||||
@@ -32,6 +33,7 @@ from parsedmarc.mail.imap import IMAPConnection
|
||||
import parsedmarc.mail.gmail as gmail_module
|
||||
import parsedmarc.mail.graph as graph_module
|
||||
import parsedmarc.mail.imap as imap_module
|
||||
import parsedmarc.elastic
|
||||
import parsedmarc.opensearch as opensearch_module
|
||||
import parsedmarc.utils
|
||||
|
||||
@@ -154,7 +156,7 @@ class Test(unittest.TestCase):
|
||||
report_path,
|
||||
offline=True,
|
||||
)
|
||||
self.assertEqual(result["report_type"], "aggregate")
|
||||
assert result["report_type"] == "aggregate"
|
||||
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
|
||||
|
||||
def testParseReportFileAcceptsPathForEmail(self):
|
||||
@@ -165,7 +167,7 @@ class Test(unittest.TestCase):
|
||||
report_path,
|
||||
offline=True,
|
||||
)
|
||||
self.assertEqual(result["report_type"], "aggregate")
|
||||
assert result["report_type"] == "aggregate"
|
||||
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
|
||||
|
||||
def testAggregateSamples(self):
|
||||
@@ -176,10 +178,11 @@ class Test(unittest.TestCase):
|
||||
if os.path.isdir(sample_path):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
result = parsedmarc.parse_report_file(
|
||||
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
|
||||
)
|
||||
assert result["report_type"] == "aggregate"
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
|
||||
print("Passed!")
|
||||
|
||||
def testEmptySample(self):
|
||||
@@ -195,13 +198,13 @@ class Test(unittest.TestCase):
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
with open(sample_path) as sample_file:
|
||||
sample_content = sample_file.read()
|
||||
parsed_report = parsedmarc.parse_report_email(
|
||||
email_result = parsedmarc.parse_report_email(
|
||||
sample_content, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
|
||||
)
|
||||
assert email_result["report_type"] == "forensic"
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "forensic"
|
||||
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
|
||||
print("Passed!")
|
||||
|
||||
def testSmtpTlsSamples(self):
|
||||
@@ -212,10 +215,9 @@ class Test(unittest.TestCase):
|
||||
if os.path.isdir(sample_path):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "smtp_tls"
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
|
||||
print("Passed!")
|
||||
|
||||
def testOpenSearchSigV4RequiresRegion(self):
|
||||
@@ -1278,7 +1280,7 @@ class TestMailboxWatchSince(unittest.TestCase):
|
||||
def testWatchInboxPassesSinceToMailboxFetch(self):
|
||||
mailbox_connection = SimpleNamespace()
|
||||
|
||||
def fake_watch(check_callback, check_timeout, should_reload=None):
|
||||
def fake_watch(check_callback, check_timeout, config_reloading=None):
|
||||
check_callback(mailbox_connection)
|
||||
raise _BreakLoop()
|
||||
|
||||
@@ -1289,7 +1291,9 @@ class TestMailboxWatchSince(unittest.TestCase):
|
||||
) as mocked:
|
||||
with self.assertRaises(_BreakLoop):
|
||||
parsedmarc.watch_inbox(
|
||||
mailbox_connection=mailbox_connection,
|
||||
mailbox_connection=cast(
|
||||
parsedmarc.MailboxConnection, mailbox_connection
|
||||
),
|
||||
callback=callback,
|
||||
check_timeout=1,
|
||||
batch_size=10,
|
||||
@@ -1337,30 +1341,30 @@ since = 2d
|
||||
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
|
||||
|
||||
|
||||
class _DummyMailboxConnection:
|
||||
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
|
||||
def __init__(self):
|
||||
self.fetch_calls = []
|
||||
self.fetch_calls: list[dict[str, object]] = []
|
||||
|
||||
def create_folder(self, folder_name):
|
||||
def create_folder(self, folder_name: str):
|
||||
return None
|
||||
|
||||
def fetch_messages(self, reports_folder, **kwargs):
|
||||
def fetch_messages(self, reports_folder: str, **kwargs):
|
||||
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
|
||||
return []
|
||||
|
||||
def fetch_message(self, message_id, **kwargs):
|
||||
def fetch_message(self, message_id) -> str:
|
||||
return ""
|
||||
|
||||
def delete_message(self, message_id):
|
||||
return None
|
||||
|
||||
def move_message(self, message_id, folder_name):
|
||||
def move_message(self, message_id, folder_name: str):
|
||||
return None
|
||||
|
||||
def keepalive(self):
|
||||
return None
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
return None
|
||||
|
||||
|
||||
@@ -1446,7 +1450,7 @@ mailbox = shared@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"certificate_path setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1518,7 +1522,7 @@ user = owner@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"password setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1559,7 +1563,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
||||
def testWellKnownFolderFallback(self):
|
||||
connection = MSGraphConnection.__new__(MSGraphConnection)
|
||||
connection.mailbox_name = "shared@example.com"
|
||||
connection._client = _FakeGraphClient()
|
||||
connection._client = _FakeGraphClient() # type: ignore[assignment]
|
||||
connection._request_with_retries = MagicMock(
|
||||
side_effect=lambda method_name, *args, **kwargs: getattr(
|
||||
connection._client, method_name
|
||||
@@ -1579,7 +1583,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
||||
def testUnknownFolderStillFails(self):
|
||||
connection = MSGraphConnection.__new__(MSGraphConnection)
|
||||
connection.mailbox_name = "shared@example.com"
|
||||
connection._client = _FakeGraphClient()
|
||||
connection._client = _FakeGraphClient() # type: ignore[assignment]
|
||||
connection._request_with_retries = MagicMock(
|
||||
side_effect=lambda method_name, *args, **kwargs: getattr(
|
||||
connection._client, method_name
|
||||
@@ -1675,7 +1679,7 @@ mailbox = shared@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"client_secret setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1707,7 +1711,7 @@ mailbox = shared@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"tenant_id setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1739,7 +1743,7 @@ tenant_id = tenant-id
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"mailbox setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1809,7 +1813,7 @@ mailbox = shared@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"tenant_id setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1840,7 +1844,7 @@ tenant_id = tenant-id
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"mailbox setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1872,7 +1876,7 @@ certificate_path = /tmp/msgraph-cert.pem
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"tenant_id setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1904,7 +1908,7 @@ certificate_path = /tmp/msgraph-cert.pem
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.error.assert_called_once_with(
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
"mailbox setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1926,7 +1930,10 @@ password = pass
|
||||
watch = true
|
||||
"""
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -1992,7 +1999,10 @@ watch = true
|
||||
# _parse_config_file called for initial load + reload
|
||||
self.assertGreaterEqual(mock_parse_config.call_count, 2)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -2065,7 +2075,10 @@ watch = true
|
||||
# The failed reload must not have closed the original clients
|
||||
initial_clients["s3_client"].close.assert_not_called()
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -2137,7 +2150,10 @@ watch = true
|
||||
# Old client must have been closed when reload succeeded
|
||||
old_client.close.assert_called_once()
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
|
||||
Reference in New Issue
Block a user