mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-21 22:12:45 +00:00
Compare commits
12 Commits
9.3.0
...
copilot/su
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3509e5c553 | ||
|
|
964a1baeed | ||
|
|
7688d00226 | ||
|
|
8796c7e3bd | ||
|
|
a05eb0807a | ||
|
|
6fceb3e2ce | ||
|
|
510d5d05a9 | ||
|
|
3445438684 | ||
|
|
17defb75b0 | ||
|
|
893d0a4f03 | ||
|
|
58e07140a8 | ||
|
|
dfdffe4947 |
@@ -7,8 +7,7 @@
|
||||
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
|
||||
"Bash(ls tests*)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
|
||||
"Bash(python -m pytest tests.py --no-header -q)"
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
|
||||
],
|
||||
"additionalDirectories": [
|
||||
"/tmp"
|
||||
|
||||
23
CHANGELOG.md
23
CHANGELOG.md
@@ -4,25 +4,20 @@
|
||||
|
||||
### Added
|
||||
|
||||
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
|
||||
- Use `systemctl reload parsedmarc` when running under `systemd`.
|
||||
- On a successful reload, old output clients are closed and recreated.
|
||||
- On a failed reload, the previous configuration remains fully active.
|
||||
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
|
||||
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
|
||||
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
|
||||
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
|
||||
- SIGHUP-based configuration reload for watch mode — update output
|
||||
destinations, DNS/GeoIP settings, processing flags, and log level
|
||||
without restarting the service or interrupting in-progress report
|
||||
processing. Use `systemctl reload parsedmarc` when running under
|
||||
systemd.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from
|
||||
`_main()` in `cli.py` to support config reload and reduce code
|
||||
duplication.
|
||||
|
||||
## 9.2.1
|
||||
|
||||
### Added
|
||||
|
||||
- Better checking of `msgraph` configuration (PR #695)
|
||||
- Better checking of `msconfig` configuration (PR #695)
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -666,15 +666,8 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
|
||||
Gmail API, Maildir path) are **not** reloaded — changing those still
|
||||
requires a full restart.
|
||||
|
||||
On a **successful** reload, existing output client connections are
|
||||
closed and new ones are created from the updated configuration. The
|
||||
service then resumes watching with the new settings.
|
||||
|
||||
If the new configuration file contains errors (missing required
|
||||
settings, unreachable output destinations, etc.), the **entire reload
|
||||
is aborted** — no output clients are replaced and the previous
|
||||
configuration remains fully active. This means a typo in one section
|
||||
will not take down an otherwise working setup. Check the logs for
|
||||
If the new configuration file contains errors, the reload is aborted
|
||||
and the previous configuration remains active. Check the logs for
|
||||
details:
|
||||
|
||||
```bash
|
||||
|
||||
@@ -2195,7 +2195,7 @@ def watch_inbox(
|
||||
batch_size: int = 10,
|
||||
since: Optional[Union[datetime, date, str]] = None,
|
||||
normalize_timespan_threshold_hours: float = 24,
|
||||
config_reloading: Optional[Callable] = None,
|
||||
should_reload: Optional[Callable] = None,
|
||||
):
|
||||
"""
|
||||
Watches the mailbox for new messages and
|
||||
@@ -2223,7 +2223,7 @@ def watch_inbox(
|
||||
batch_size (int): Number of messages to read and process before saving
|
||||
since: Search for messages since certain time
|
||||
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
|
||||
config_reloading: Optional callable that returns True when a config
|
||||
should_reload: Optional callable that returns True when a config
|
||||
reload has been requested (e.g. via SIGHUP)
|
||||
"""
|
||||
|
||||
@@ -2253,8 +2253,8 @@ def watch_inbox(
|
||||
"check_callback": check_callback,
|
||||
"check_timeout": check_timeout,
|
||||
}
|
||||
if config_reloading is not None:
|
||||
watch_kwargs["config_reloading"] = config_reloading
|
||||
if should_reload is not None:
|
||||
watch_kwargs["should_reload"] = should_reload
|
||||
|
||||
mailbox_connection.watch(**watch_kwargs)
|
||||
|
||||
|
||||
@@ -201,8 +201,21 @@ def _parse_config_file(config_file, opts):
|
||||
"normalize_timespan_threshold_hours"
|
||||
)
|
||||
if "index_prefix_domain_map" in general_config:
|
||||
with open(general_config["index_prefix_domain_map"]) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
map_path = general_config["index_prefix_domain_map"]
|
||||
try:
|
||||
with open(map_path) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
except OSError as exc:
|
||||
raise ConfigurationError(
|
||||
"Failed to read index_prefix_domain_map file '{0}': {1}".format(
|
||||
map_path, exc
|
||||
)
|
||||
) from exc
|
||||
except yaml.YAMLError as exc:
|
||||
raise ConfigurationError(
|
||||
"Failed to parse YAML in index_prefix_domain_map "
|
||||
"file '{0}': {1}".format(map_path, exc)
|
||||
) from exc
|
||||
if "offline" in general_config:
|
||||
opts.offline = bool(general_config.getboolean("offline"))
|
||||
if "strip_attachment_payloads" in general_config:
|
||||
@@ -747,15 +760,15 @@ def _parse_config_file(config_file, opts):
|
||||
if "oauth2_port" in gmail_api_config:
|
||||
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
|
||||
if "auth_mode" in gmail_api_config:
|
||||
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
|
||||
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
|
||||
if "service_account_user" in gmail_api_config:
|
||||
opts.gmail_api_service_account_user = gmail_api_config[
|
||||
opts.gmail_api_service_account_user = gmail_api_config.get(
|
||||
"service_account_user"
|
||||
].strip()
|
||||
).strip()
|
||||
elif "delegated_user" in gmail_api_config:
|
||||
opts.gmail_api_service_account_user = gmail_api_config[
|
||||
opts.gmail_api_service_account_user = gmail_api_config.get(
|
||||
"delegated_user"
|
||||
].strip()
|
||||
).strip()
|
||||
|
||||
if "maildir" in config.sections():
|
||||
maildir_api_config = config["maildir"]
|
||||
@@ -810,38 +823,6 @@ def _parse_config_file(config_file, opts):
|
||||
return index_prefix_domain_map
|
||||
|
||||
|
||||
class _ElasticsearchHandle:
|
||||
"""Sentinel so Elasticsearch participates in _close_output_clients."""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
conn = elastic.connections.get_connection()
|
||||
if not isinstance(conn, str):
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
elastic.connections.remove_connection("default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class _OpenSearchHandle:
|
||||
"""Sentinel so OpenSearch participates in _close_output_clients."""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
conn = opensearch.connections.get_connection()
|
||||
if not isinstance(conn, str):
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
opensearch.connections.remove_connection("default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _init_output_clients(opts):
|
||||
"""Create output clients based on current opts.
|
||||
|
||||
@@ -851,85 +832,15 @@ def _init_output_clients(opts):
|
||||
Raises:
|
||||
ConfigurationError: If a required output client cannot be created.
|
||||
"""
|
||||
# Validate all required settings before creating any clients so that a
|
||||
# ConfigurationError does not leave partially-created clients un-closed.
|
||||
if opts.hec and (opts.hec_token is None or opts.hec_index is None):
|
||||
raise ConfigurationError(
|
||||
"HEC token and HEC index are required when using HEC URL"
|
||||
)
|
||||
|
||||
clients = {}
|
||||
|
||||
if opts.s3_bucket:
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
bucket_path=opts.s3_path,
|
||||
region_name=opts.s3_region_name,
|
||||
endpoint_url=opts.s3_endpoint_url,
|
||||
access_key_id=opts.s3_access_key_id,
|
||||
secret_access_key=opts.s3_secret_access_key,
|
||||
)
|
||||
|
||||
if opts.syslog_server:
|
||||
clients["syslog_client"] = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
protocol=opts.syslog_protocol or "udp",
|
||||
cafile_path=opts.syslog_cafile_path,
|
||||
certfile_path=opts.syslog_certfile_path,
|
||||
keyfile_path=opts.syslog_keyfile_path,
|
||||
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
|
||||
retry_attempts=opts.syslog_retry_attempts
|
||||
if opts.syslog_retry_attempts is not None
|
||||
else 3,
|
||||
retry_delay=opts.syslog_retry_delay
|
||||
if opts.syslog_retry_delay is not None
|
||||
else 5,
|
||||
)
|
||||
|
||||
if opts.hec:
|
||||
if opts.hec_token is None or opts.hec_index is None:
|
||||
raise ConfigurationError(
|
||||
"HEC token and HEC index are required when using HEC URL"
|
||||
)
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
clients["hec_client"] = splunk.HECClient(
|
||||
opts.hec, opts.hec_token, opts.hec_index, verify=verify
|
||||
)
|
||||
|
||||
if opts.kafka_hosts:
|
||||
ssl_context = None
|
||||
if opts.kafka_skip_certificate_verification:
|
||||
logger.debug("Skipping Kafka certificate verification")
|
||||
ssl_context = create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = CERT_NONE
|
||||
clients["kafka_client"] = kafkaclient.KafkaClient(
|
||||
opts.kafka_hosts,
|
||||
username=opts.kafka_username,
|
||||
password=opts.kafka_password,
|
||||
ssl_context=ssl_context,
|
||||
)
|
||||
|
||||
if opts.gelf_host:
|
||||
clients["gelf_client"] = gelf.GelfClient(
|
||||
host=opts.gelf_host,
|
||||
port=int(opts.gelf_port),
|
||||
mode=opts.gelf_mode,
|
||||
)
|
||||
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
clients["webhook_client"] = webhook.WebhookClient(
|
||||
aggregate_url=opts.webhook_aggregate_url,
|
||||
forensic_url=opts.webhook_forensic_url,
|
||||
smtp_tls_url=opts.webhook_smtp_tls_url,
|
||||
timeout=opts.webhook_timeout,
|
||||
)
|
||||
|
||||
# Elasticsearch and OpenSearch mutate module-level global state via
|
||||
# connections.create_connection(), which cannot be rolled back if a later
|
||||
# step fails. Initialise them last so that all other clients are created
|
||||
# successfully first; this minimises the window for partial-init problems
|
||||
# during config reload.
|
||||
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
||||
if opts.elasticsearch_hosts:
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
@@ -963,7 +874,6 @@ def _init_output_clients(opts):
|
||||
aggregate_indexes=[es_aggregate_index],
|
||||
forensic_indexes=[es_forensic_index],
|
||||
)
|
||||
clients["elasticsearch"] = _ElasticsearchHandle()
|
||||
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
@@ -1000,7 +910,100 @@ def _init_output_clients(opts):
|
||||
aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index],
|
||||
)
|
||||
clients["opensearch"] = _OpenSearchHandle()
|
||||
|
||||
if opts.s3_bucket:
|
||||
try:
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
bucket_path=opts.s3_path,
|
||||
region_name=opts.s3_region_name,
|
||||
endpoint_url=opts.s3_endpoint_url,
|
||||
access_key_id=opts.s3_access_key_id,
|
||||
secret_access_key=opts.s3_secret_access_key,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize S3 client; skipping", exc_info=True)
|
||||
|
||||
if opts.syslog_server:
|
||||
try:
|
||||
clients["syslog_client"] = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
protocol=opts.syslog_protocol or "udp",
|
||||
cafile_path=opts.syslog_cafile_path,
|
||||
certfile_path=opts.syslog_certfile_path,
|
||||
keyfile_path=opts.syslog_keyfile_path,
|
||||
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
|
||||
retry_attempts=opts.syslog_retry_attempts
|
||||
if opts.syslog_retry_attempts is not None
|
||||
else 3,
|
||||
retry_delay=opts.syslog_retry_delay
|
||||
if opts.syslog_retry_delay is not None
|
||||
else 5,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize syslog client; skipping", exc_info=True
|
||||
)
|
||||
|
||||
if opts.hec:
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
try:
|
||||
clients["hec_client"] = splunk.HECClient(
|
||||
opts.hec, opts.hec_token, opts.hec_index, verify=verify
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize Splunk HEC client; skipping", exc_info=True
|
||||
)
|
||||
|
||||
if opts.kafka_hosts:
|
||||
ssl_context = None
|
||||
if opts.kafka_ssl:
|
||||
ssl_context = create_default_context()
|
||||
if opts.kafka_skip_certificate_verification:
|
||||
logger.debug("Skipping Kafka certificate verification")
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = CERT_NONE
|
||||
try:
|
||||
clients["kafka_client"] = kafkaclient.KafkaClient(
|
||||
opts.kafka_hosts,
|
||||
username=opts.kafka_username,
|
||||
password=opts.kafka_password,
|
||||
ssl=opts.kafka_ssl,
|
||||
ssl_context=ssl_context,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize Kafka client; skipping", exc_info=True)
|
||||
|
||||
if opts.gelf_host:
|
||||
try:
|
||||
clients["gelf_client"] = gelf.GelfClient(
|
||||
host=opts.gelf_host,
|
||||
port=int(opts.gelf_port),
|
||||
mode=opts.gelf_mode,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize GELF client; skipping", exc_info=True)
|
||||
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
try:
|
||||
clients["webhook_client"] = webhook.WebhookClient(
|
||||
aggregate_url=opts.webhook_aggregate_url,
|
||||
forensic_url=opts.webhook_forensic_url,
|
||||
smtp_tls_url=opts.webhook_smtp_tls_url,
|
||||
timeout=opts.webhook_timeout,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize webhook client; skipping", exc_info=True
|
||||
)
|
||||
|
||||
return clients
|
||||
|
||||
@@ -1032,7 +1035,7 @@ def _main():
|
||||
if "policy_published" in report:
|
||||
domain = report["policy_published"]["domain"]
|
||||
elif "reported_domain" in report:
|
||||
domain = report["reported_domain"]
|
||||
domain = report("reported_domain")
|
||||
elif "policies" in report:
|
||||
domain = report["policies"][0]["domain"]
|
||||
if domain:
|
||||
@@ -1627,7 +1630,7 @@ def _main():
|
||||
try:
|
||||
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
|
||||
except ConfigurationError as e:
|
||||
logger.critical(str(e))
|
||||
logger.error(str(e))
|
||||
exit(-1)
|
||||
|
||||
logger.setLevel(logging.ERROR)
|
||||
@@ -1649,8 +1652,6 @@ def _main():
|
||||
except Exception as error:
|
||||
logger.warning("Unable to write to log file: {}".format(error))
|
||||
|
||||
opts.active_log_file = opts.log_file
|
||||
|
||||
if (
|
||||
opts.imap_host is None
|
||||
and opts.graph_client_id is None
|
||||
@@ -1666,14 +1667,14 @@ def _main():
|
||||
# Initialize output clients
|
||||
try:
|
||||
clients = _init_output_clients(opts)
|
||||
except elastic.ElasticsearchError as e:
|
||||
logger.exception("Elasticsearch Error: {0}".format(e))
|
||||
except elastic.ElasticsearchError:
|
||||
logger.exception("Elasticsearch Error")
|
||||
exit(1)
|
||||
except opensearch.OpenSearchError as e:
|
||||
logger.exception("OpenSearch Error: {0}".format(e))
|
||||
except opensearch.OpenSearchError:
|
||||
logger.exception("OpenSearch Error")
|
||||
exit(1)
|
||||
except ConfigurationError as e:
|
||||
logger.critical(str(e))
|
||||
logger.error(str(e))
|
||||
exit(1)
|
||||
except Exception as error_:
|
||||
logger.error("Output client error: {0}".format(error_))
|
||||
@@ -1809,9 +1810,8 @@ def _main():
|
||||
try:
|
||||
if opts.imap_user is None or opts.imap_password is None:
|
||||
logger.error(
|
||||
"IMAP user and password must be specified if host is specified"
|
||||
"IMAP user and password must be specified ifhost is specified"
|
||||
)
|
||||
exit(1)
|
||||
|
||||
ssl = True
|
||||
verify = True
|
||||
@@ -1988,9 +1988,8 @@ def _main():
|
||||
|
||||
def _handle_sighup(signum, frame):
|
||||
nonlocal _reload_requested
|
||||
# Logging is not async-signal-safe; only set the flag here.
|
||||
# The log message is emitted from the main loop when the flag is read.
|
||||
_reload_requested = True
|
||||
logger.info("SIGHUP received, config will reload after current batch")
|
||||
|
||||
if hasattr(signal, "SIGHUP"):
|
||||
signal.signal(signal.SIGHUP, _handle_sighup)
|
||||
@@ -2025,7 +2024,7 @@ def _main():
|
||||
reverse_dns_map_url=opts.reverse_dns_map_url,
|
||||
offline=opts.offline,
|
||||
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
|
||||
config_reloading=lambda: _reload_requested,
|
||||
should_reload=lambda: _reload_requested,
|
||||
)
|
||||
except FileExistsError as error:
|
||||
logger.error("{0}".format(error.__str__()))
|
||||
@@ -2037,11 +2036,9 @@ def _main():
|
||||
if not _reload_requested:
|
||||
break
|
||||
|
||||
# Reload configuration — emit the log message here (not in the
|
||||
# signal handler, which is not async-signal-safe), then clear the
|
||||
# flag so that any new SIGHUP arriving while we reload will be
|
||||
# captured for the next iteration rather than being silently dropped.
|
||||
logger.info("SIGHUP received, config will reload after current batch")
|
||||
# Reload configuration — clear the flag first so that any new
|
||||
# SIGHUP arriving while we reload will be captured for the next
|
||||
# iteration rather than being silently dropped.
|
||||
_reload_requested = False
|
||||
logger.info("Reloading configuration...")
|
||||
try:
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.3.0"
|
||||
__version__ = "9.2.1"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
|
||||
|
||||
@@ -12,7 +14,6 @@ from parsedmarc import (
|
||||
parsed_forensic_reports_to_csv_rows,
|
||||
parsed_smtp_tls_reports_to_csv_rows,
|
||||
)
|
||||
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
|
||||
|
||||
log_context_data = threading.local()
|
||||
|
||||
@@ -36,7 +37,7 @@ class GelfClient(object):
|
||||
"""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.logger = logging.getLogger("parsedmarc_gelf")
|
||||
self.logger = logging.getLogger("parsedmarc_syslog")
|
||||
self.logger.setLevel(logging.INFO)
|
||||
self.logger.addFilter(ContextFilter())
|
||||
self.gelf_mode = {
|
||||
@@ -49,7 +50,7 @@ class GelfClient(object):
|
||||
)
|
||||
self.logger.addHandler(self.handler)
|
||||
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
|
||||
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
@@ -57,13 +58,13 @@ class GelfClient(object):
|
||||
|
||||
log_context_data.parsedmarc = None
|
||||
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
|
||||
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
self.logger.info("parsedmarc forensic report")
|
||||
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
|
||||
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
|
||||
@@ -175,13 +175,13 @@ class GmailConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
|
||||
@@ -278,14 +278,12 @@ class MSGraphConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
|
||||
@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
self._client.noop()
|
||||
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
"""
|
||||
Use an IDLE IMAP connection to parse incoming emails,
|
||||
and pass the results to a callback function
|
||||
@@ -94,8 +94,6 @@ class IMAPConnection(MailboxConnection):
|
||||
check_callback(self)
|
||||
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
IMAPClient(
|
||||
host=self._client.host,
|
||||
@@ -113,5 +111,5 @@ class IMAPConnection(MailboxConnection):
|
||||
except Exception as e:
|
||||
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
|
||||
@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
|
||||
def keepalive(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -63,14 +63,12 @@ class MaildirConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
return
|
||||
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
check_callback(self)
|
||||
except Exception as e:
|
||||
logger.warning("Maildir init error. {0}".format(e))
|
||||
if config_reloading and config_reloading():
|
||||
if should_reload and should_reload():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
|
||||
@@ -93,11 +93,3 @@ class S3Client(object):
|
||||
self.bucket.put_object(
|
||||
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Clean up the boto3 resource."""
|
||||
try:
|
||||
if self.s3.meta is not None:
|
||||
self.s3.meta.client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -207,7 +207,3 @@ class HECClient(object):
|
||||
raise SplunkError(e.__str__())
|
||||
if response["code"] != 0:
|
||||
raise SplunkError(response["text"])
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying HTTP session."""
|
||||
self.session.close()
|
||||
|
||||
96
tests.py
96
tests.py
@@ -12,11 +12,10 @@ from base64 import urlsafe_b64encode
|
||||
from glob import glob
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
from typing import cast
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from lxml import etree # type: ignore[import-untyped]
|
||||
from lxml import etree
|
||||
from googleapiclient.errors import HttpError
|
||||
from httplib2 import Response
|
||||
from imapclient.exceptions import IMAPClientError
|
||||
@@ -33,7 +32,6 @@ from parsedmarc.mail.imap import IMAPConnection
|
||||
import parsedmarc.mail.gmail as gmail_module
|
||||
import parsedmarc.mail.graph as graph_module
|
||||
import parsedmarc.mail.imap as imap_module
|
||||
import parsedmarc.elastic
|
||||
import parsedmarc.opensearch as opensearch_module
|
||||
import parsedmarc.utils
|
||||
|
||||
@@ -156,7 +154,7 @@ class Test(unittest.TestCase):
|
||||
report_path,
|
||||
offline=True,
|
||||
)
|
||||
assert result["report_type"] == "aggregate"
|
||||
self.assertEqual(result["report_type"], "aggregate")
|
||||
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
|
||||
|
||||
def testParseReportFileAcceptsPathForEmail(self):
|
||||
@@ -167,7 +165,7 @@ class Test(unittest.TestCase):
|
||||
report_path,
|
||||
offline=True,
|
||||
)
|
||||
assert result["report_type"] == "aggregate"
|
||||
self.assertEqual(result["report_type"], "aggregate")
|
||||
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
|
||||
|
||||
def testAggregateSamples(self):
|
||||
@@ -178,11 +176,10 @@ class Test(unittest.TestCase):
|
||||
if os.path.isdir(sample_path):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
result = parsedmarc.parse_report_file(
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
|
||||
)
|
||||
assert result["report_type"] == "aggregate"
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
|
||||
)["report"]
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
|
||||
print("Passed!")
|
||||
|
||||
def testEmptySample(self):
|
||||
@@ -198,13 +195,13 @@ class Test(unittest.TestCase):
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
with open(sample_path) as sample_file:
|
||||
sample_content = sample_file.read()
|
||||
email_result = parsedmarc.parse_report_email(
|
||||
parsed_report = parsedmarc.parse_report_email(
|
||||
sample_content, offline=OFFLINE_MODE
|
||||
)
|
||||
assert email_result["report_type"] == "forensic"
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "forensic"
|
||||
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
|
||||
)["report"]
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
|
||||
print("Passed!")
|
||||
|
||||
def testSmtpTlsSamples(self):
|
||||
@@ -215,9 +212,10 @@ class Test(unittest.TestCase):
|
||||
if os.path.isdir(sample_path):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "smtp_tls"
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
|
||||
print("Passed!")
|
||||
|
||||
def testOpenSearchSigV4RequiresRegion(self):
|
||||
@@ -1280,7 +1278,7 @@ class TestMailboxWatchSince(unittest.TestCase):
|
||||
def testWatchInboxPassesSinceToMailboxFetch(self):
|
||||
mailbox_connection = SimpleNamespace()
|
||||
|
||||
def fake_watch(check_callback, check_timeout, config_reloading=None):
|
||||
def fake_watch(check_callback, check_timeout, should_reload=None):
|
||||
check_callback(mailbox_connection)
|
||||
raise _BreakLoop()
|
||||
|
||||
@@ -1291,9 +1289,7 @@ class TestMailboxWatchSince(unittest.TestCase):
|
||||
) as mocked:
|
||||
with self.assertRaises(_BreakLoop):
|
||||
parsedmarc.watch_inbox(
|
||||
mailbox_connection=cast(
|
||||
parsedmarc.MailboxConnection, mailbox_connection
|
||||
),
|
||||
mailbox_connection=mailbox_connection,
|
||||
callback=callback,
|
||||
check_timeout=1,
|
||||
batch_size=10,
|
||||
@@ -1341,30 +1337,30 @@ since = 2d
|
||||
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
|
||||
|
||||
|
||||
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
|
||||
class _DummyMailboxConnection:
|
||||
def __init__(self):
|
||||
self.fetch_calls: list[dict[str, object]] = []
|
||||
self.fetch_calls = []
|
||||
|
||||
def create_folder(self, folder_name: str):
|
||||
def create_folder(self, folder_name):
|
||||
return None
|
||||
|
||||
def fetch_messages(self, reports_folder: str, **kwargs):
|
||||
def fetch_messages(self, reports_folder, **kwargs):
|
||||
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
|
||||
return []
|
||||
|
||||
def fetch_message(self, message_id) -> str:
|
||||
def fetch_message(self, message_id, **kwargs):
|
||||
return ""
|
||||
|
||||
def delete_message(self, message_id):
|
||||
return None
|
||||
|
||||
def move_message(self, message_id, folder_name: str):
|
||||
def move_message(self, message_id, folder_name):
|
||||
return None
|
||||
|
||||
def keepalive(self):
|
||||
return None
|
||||
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
def watch(self, check_callback, check_timeout):
|
||||
return None
|
||||
|
||||
|
||||
@@ -1450,7 +1446,7 @@ mailbox = shared@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"certificate_path setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1522,7 +1518,7 @@ user = owner@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"password setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1563,7 +1559,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
||||
def testWellKnownFolderFallback(self):
|
||||
connection = MSGraphConnection.__new__(MSGraphConnection)
|
||||
connection.mailbox_name = "shared@example.com"
|
||||
connection._client = _FakeGraphClient() # type: ignore[assignment]
|
||||
connection._client = _FakeGraphClient()
|
||||
connection._request_with_retries = MagicMock(
|
||||
side_effect=lambda method_name, *args, **kwargs: getattr(
|
||||
connection._client, method_name
|
||||
@@ -1583,7 +1579,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
||||
def testUnknownFolderStillFails(self):
|
||||
connection = MSGraphConnection.__new__(MSGraphConnection)
|
||||
connection.mailbox_name = "shared@example.com"
|
||||
connection._client = _FakeGraphClient() # type: ignore[assignment]
|
||||
connection._client = _FakeGraphClient()
|
||||
connection._request_with_retries = MagicMock(
|
||||
side_effect=lambda method_name, *args, **kwargs: getattr(
|
||||
connection._client, method_name
|
||||
@@ -1679,7 +1675,7 @@ mailbox = shared@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"client_secret setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1711,7 +1707,7 @@ mailbox = shared@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"tenant_id setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1743,7 +1739,7 @@ tenant_id = tenant-id
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"mailbox setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1813,7 +1809,7 @@ mailbox = shared@example.com
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"tenant_id setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1844,7 +1840,7 @@ tenant_id = tenant-id
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"mailbox setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1876,7 +1872,7 @@ certificate_path = /tmp/msgraph-cert.pem
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"tenant_id setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1908,7 +1904,7 @@ certificate_path = /tmp/msgraph-cert.pem
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(system_exit.exception.code, -1)
|
||||
mock_logger.critical.assert_called_once_with(
|
||||
mock_logger.error.assert_called_once_with(
|
||||
"mailbox setting missing from the msgraph config section"
|
||||
)
|
||||
mock_graph_connection.assert_not_called()
|
||||
@@ -1930,10 +1926,7 @@ password = pass
|
||||
watch = true
|
||||
"""
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -1999,10 +1992,7 @@ watch = true
|
||||
# _parse_config_file called for initial load + reload
|
||||
self.assertGreaterEqual(mock_parse_config.call_count, 2)
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -2075,10 +2065,7 @@ watch = true
|
||||
# The failed reload must not have closed the original clients
|
||||
initial_clients["s3_client"].close.assert_not_called()
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@@ -2150,10 +2137,7 @@ watch = true
|
||||
# Old client must have been closed when reload succeeded
|
||||
old_client.close.assert_called_once()
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@unittest.skipUnless(hasattr(signal, "SIGHUP"), "SIGHUP not available on this platform")
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
|
||||
Reference in New Issue
Block a user