mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-26 08:22:45 +00:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
12c4676b79 | ||
|
|
cda039ee27 | ||
|
|
ff0ca6538c | ||
|
|
2032438d3b | ||
|
|
1e95c5d30b | ||
|
|
cb2384be83 | ||
|
|
9a5b5310fa | ||
|
|
9849598100 | ||
|
|
e82f3e58a1 | ||
|
|
dd1a8fd461 | ||
|
|
81656c75e9 |
17
.claude/settings.json
Normal file
17
.claude/settings.json
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"permissions": {
|
||||
"allow": [
|
||||
"Bash(python -c \"import py_compile; py_compile.compile\\(''parsedmarc/cli.py'', doraise=True\\)\")",
|
||||
"Bash(ruff check:*)",
|
||||
"Bash(ruff format:*)",
|
||||
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
|
||||
"Bash(ls tests*)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
|
||||
"Bash(python -m pytest tests.py --no-header -q)"
|
||||
],
|
||||
"additionalDirectories": [
|
||||
"/tmp"
|
||||
]
|
||||
}
|
||||
}
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -137,7 +137,7 @@ samples/private
|
||||
*.html
|
||||
*.sqlite-journal
|
||||
|
||||
parsedmarc.ini
|
||||
parsedmarc*.ini
|
||||
scratch.py
|
||||
|
||||
parsedmarc/resources/maps/base_reverse_dns.csv
|
||||
|
||||
66
CHANGELOG.md
66
CHANGELOG.md
@@ -1,10 +1,74 @@
|
||||
# Changelog
|
||||
|
||||
## 9.5.1
|
||||
|
||||
### Changes
|
||||
|
||||
- Correct ISO format for MSGraphConnection timestamps (PR #706)
|
||||
|
||||
## 9.5.0
|
||||
|
||||
### Added
|
||||
|
||||
- Environment variable configuration support: any config option can now be set via `PARSEDMARC_{SECTION}_{KEY}` environment variables (e.g. `PARSEDMARC_IMAP_PASSWORD`, `PARSEDMARC_SPLUNK_HEC_TOKEN`). Environment variables override config file values but are overridden by CLI arguments.
|
||||
- `PARSEDMARC_CONFIG_FILE` environment variable to specify the config file path without the `-c` flag.
|
||||
- Env-only mode: parsedmarc can now run without a config file when `PARSEDMARC_*` environment variables are set, enabling fully file-less Docker deployments.
|
||||
- Explicit read permission check on config file, giving a clear error message when the container UID cannot read the file (e.g. `chmod 600` with a UID mismatch).
|
||||
|
||||
## 9.4.0
|
||||
|
||||
### Added
|
||||
|
||||
- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
|
||||
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
|
||||
- Add premade OpenSearch index patterns, visualizations, and dashboards
|
||||
|
||||
### Changed
|
||||
|
||||
- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
|
||||
- Bump OpenSearch support to `< 4`
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
|
||||
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
|
||||
|
||||
## 9.3.1
|
||||
|
||||
### Breaking changes
|
||||
|
||||
- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
|
||||
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
|
||||
|
||||
### Fixed
|
||||
|
||||
- Splunk HEC `skip_certificate_verification` now works correctly
|
||||
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
|
||||
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
|
||||
|
||||
## 9.3.0
|
||||
|
||||
### Added
|
||||
|
||||
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
|
||||
- Use `systemctl reload parsedmarc` when running under `systemd`.
|
||||
- On a successful reload, old output clients are closed and recreated.
|
||||
- On a failed reload, the previous configuration remains fully active.
|
||||
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
|
||||
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
|
||||
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
|
||||
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
|
||||
|
||||
## 9.2.1
|
||||
|
||||
### Added
|
||||
|
||||
- Better checking of `msconfig` configuration (PR #695)
|
||||
- Better checking of `msgraph` configuration (PR #695)
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
47
docker-compose.dashboard-dev.yml
Normal file
47
docker-compose.dashboard-dev.yml
Normal file
@@ -0,0 +1,47 @@
|
||||
name: parsedmarc-dashboards
|
||||
|
||||
include:
|
||||
- docker-compose.yml
|
||||
|
||||
services:
|
||||
kibana:
|
||||
image: docker.elastic.co/kibana/kibana:8.19.7
|
||||
environment:
|
||||
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
|
||||
ports:
|
||||
- "127.0.0.1:5601:5601"
|
||||
depends_on:
|
||||
elasticsearch:
|
||||
condition: service_healthy
|
||||
|
||||
opensearch-dashboards:
|
||||
image: opensearchproject/opensearch-dashboards:3
|
||||
environment:
|
||||
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
|
||||
ports:
|
||||
- "127.0.0.1:5602:5601"
|
||||
depends_on:
|
||||
opensearch:
|
||||
condition: service_healthy
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
environment:
|
||||
- GRAFANA_PASSWORD=${GRAFANA_PASSWORD}
|
||||
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
|
||||
ports:
|
||||
- "127.0.0.1:3000:3000"
|
||||
depends_on:
|
||||
elasticsearch:
|
||||
condition: service_healthy
|
||||
|
||||
splunk:
|
||||
image: splunk/splunk:latest
|
||||
environment:
|
||||
- SPLUNK_START_ARGS=--accept-license
|
||||
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
|
||||
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
|
||||
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
|
||||
ports:
|
||||
- "127.0.0.1:8000:8000"
|
||||
- "127.0.0.1:8088:8088"
|
||||
@@ -48,7 +48,7 @@ services:
|
||||
test:
|
||||
[
|
||||
"CMD-SHELL",
|
||||
"curl -s -XGET http://localhost:9201/_cluster/health?pretty | grep status | grep -q '\\(green\\|yellow\\)'"
|
||||
"curl -sk -u admin:${OPENSEARCH_INITIAL_ADMIN_PASSWORD} -XGET https://localhost:9200/_cluster/health?pretty | grep status | grep -q '\\(green\\|yellow\\)'"
|
||||
]
|
||||
interval: 10s
|
||||
timeout: 10s
|
||||
|
||||
@@ -273,6 +273,8 @@ The full set of configuration options are:
|
||||
(Default: `True`)
|
||||
- `timeout` - float: Timeout in seconds (Default: 60)
|
||||
- `cert_path` - str: Path to a trusted certificates
|
||||
- `skip_certificate_verification` - bool: Skip certificate
|
||||
verification (not recommended)
|
||||
- `index_suffix` - str: A suffix to apply to the index names
|
||||
- `index_prefix` - str: A prefix to apply to the index names
|
||||
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
|
||||
@@ -300,6 +302,8 @@ The full set of configuration options are:
|
||||
(Default: `True`)
|
||||
- `timeout` - float: Timeout in seconds (Default: 60)
|
||||
- `cert_path` - str: Path to a trusted certificates
|
||||
- `skip_certificate_verification` - bool: Skip certificate
|
||||
verification (not recommended)
|
||||
- `index_suffix` - str: A suffix to apply to the index names
|
||||
- `index_prefix` - str: A prefix to apply to the index names
|
||||
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
|
||||
@@ -404,6 +408,7 @@ The full set of configuration options are:
|
||||
retry_attempts = 3
|
||||
retry_delay = 5
|
||||
```
|
||||
|
||||
- `gmail_api`
|
||||
- `credentials_file` - str: Path to file containing the
|
||||
credentials, None to disable (Default: `None`)
|
||||
@@ -442,7 +447,7 @@ The full set of configuration options are:
|
||||
- `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR
|
||||
|
||||
:::{note}
|
||||
Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
|
||||
Information regarding the setup of the Data Collection Rule can be found [in the Azure documentation](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
|
||||
:::
|
||||
- `gelf`
|
||||
- `host` - str: The GELF server name or IP address
|
||||
@@ -526,6 +531,96 @@ PUT _cluster/settings
|
||||
Increasing this value increases resource usage.
|
||||
:::
|
||||
|
||||
## Environment variable configuration
|
||||
|
||||
Any configuration option can be set via environment variables using the
|
||||
naming convention `PARSEDMARC_{SECTION}_{KEY}` (uppercase). This is
|
||||
especially useful for Docker deployments where file permissions make it
|
||||
difficult to use config files for secrets.
|
||||
|
||||
**Priority order:** CLI arguments > environment variables > config file > defaults
|
||||
|
||||
### Examples
|
||||
|
||||
```bash
|
||||
# Set IMAP credentials via env vars
|
||||
export PARSEDMARC_IMAP_HOST=imap.example.com
|
||||
export PARSEDMARC_IMAP_USER=dmarc@example.com
|
||||
export PARSEDMARC_IMAP_PASSWORD=secret
|
||||
|
||||
# Elasticsearch
|
||||
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://localhost:9200
|
||||
export PARSEDMARC_ELASTICSEARCH_SSL=false
|
||||
|
||||
# Splunk HEC (note: section name splunk_hec becomes SPLUNK_HEC)
|
||||
export PARSEDMARC_SPLUNK_HEC_URL=https://splunk.example.com
|
||||
export PARSEDMARC_SPLUNK_HEC_TOKEN=my-hec-token
|
||||
export PARSEDMARC_SPLUNK_HEC_INDEX=email
|
||||
|
||||
# General settings
|
||||
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
|
||||
export PARSEDMARC_GENERAL_DEBUG=true
|
||||
```
|
||||
|
||||
### Specifying the config file via environment variable
|
||||
|
||||
```bash
|
||||
export PARSEDMARC_CONFIG_FILE=/etc/parsedmarc.ini
|
||||
parsedmarc
|
||||
```
|
||||
|
||||
### Running without a config file (env-only mode)
|
||||
|
||||
When no config file is given (neither `-c` flag nor `PARSEDMARC_CONFIG_FILE`),
|
||||
parsedmarc will still pick up any `PARSEDMARC_*` environment variables. This
|
||||
enables fully file-less deployments:
|
||||
|
||||
```bash
|
||||
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
|
||||
export PARSEDMARC_GENERAL_OFFLINE=true
|
||||
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://elasticsearch:9200
|
||||
parsedmarc /path/to/reports/*
|
||||
```
|
||||
|
||||
### Docker Compose example
|
||||
|
||||
```yaml
|
||||
services:
|
||||
parsedmarc:
|
||||
image: parsedmarc:latest
|
||||
environment:
|
||||
PARSEDMARC_IMAP_HOST: imap.example.com
|
||||
PARSEDMARC_IMAP_USER: dmarc@example.com
|
||||
PARSEDMARC_IMAP_PASSWORD: ${IMAP_PASSWORD}
|
||||
PARSEDMARC_MAILBOX_WATCH: "true"
|
||||
PARSEDMARC_ELASTICSEARCH_HOSTS: http://elasticsearch:9200
|
||||
PARSEDMARC_GENERAL_SAVE_AGGREGATE: "true"
|
||||
PARSEDMARC_GENERAL_SAVE_FORENSIC: "true"
|
||||
```
|
||||
|
||||
### Section name mapping
|
||||
|
||||
For sections with underscores in the name, the full section name is used:
|
||||
|
||||
| Section | Env var prefix |
|
||||
|------------------|-------------------------------|
|
||||
| `general` | `PARSEDMARC_GENERAL_` |
|
||||
| `mailbox` | `PARSEDMARC_MAILBOX_` |
|
||||
| `imap` | `PARSEDMARC_IMAP_` |
|
||||
| `msgraph` | `PARSEDMARC_MSGRAPH_` |
|
||||
| `elasticsearch` | `PARSEDMARC_ELASTICSEARCH_` |
|
||||
| `opensearch` | `PARSEDMARC_OPENSEARCH_` |
|
||||
| `splunk_hec` | `PARSEDMARC_SPLUNK_HEC_` |
|
||||
| `kafka` | `PARSEDMARC_KAFKA_` |
|
||||
| `smtp` | `PARSEDMARC_SMTP_` |
|
||||
| `s3` | `PARSEDMARC_S3_` |
|
||||
| `syslog` | `PARSEDMARC_SYSLOG_` |
|
||||
| `gmail_api` | `PARSEDMARC_GMAIL_API_` |
|
||||
| `maildir` | `PARSEDMARC_MAILDIR_` |
|
||||
| `log_analytics` | `PARSEDMARC_LOG_ANALYTICS_` |
|
||||
| `gelf` | `PARSEDMARC_GELF_` |
|
||||
| `webhook` | `PARSEDMARC_WEBHOOK_` |
|
||||
|
||||
## Performance tuning
|
||||
|
||||
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
|
||||
@@ -602,6 +697,7 @@ After=network.target network-online.target elasticsearch.service
|
||||
|
||||
[Service]
|
||||
ExecStart=/opt/parsedmarc/venv/bin/parsedmarc -c /etc/parsedmarc.ini
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
User=parsedmarc
|
||||
Group=parsedmarc
|
||||
Restart=always
|
||||
@@ -634,6 +730,51 @@ sudo service parsedmarc restart
|
||||
|
||||
:::
|
||||
|
||||
### Reloading configuration without restarting
|
||||
|
||||
When running in watch mode, `parsedmarc` supports reloading its
|
||||
configuration file without restarting the service or interrupting
|
||||
report processing that is already in progress. Send a `SIGHUP` signal
|
||||
to the process, or use `systemctl reload` if the unit file includes
|
||||
the `ExecReload` line shown above:
|
||||
|
||||
```bash
|
||||
sudo systemctl reload parsedmarc
|
||||
```
|
||||
|
||||
The reload takes effect after the current batch of reports finishes
|
||||
processing and all output operations (Elasticsearch, Kafka, S3, etc.)
|
||||
for that batch have completed. The following settings are reloaded:
|
||||
|
||||
- All output destinations (Elasticsearch, OpenSearch, Kafka, S3,
|
||||
Splunk, syslog, GELF, webhooks, Log Analytics)
|
||||
- Multi-tenant index prefix domain map (`index_prefix_domain_map` —
|
||||
the referenced YAML file is re-read on reload)
|
||||
- DNS and GeoIP settings (`nameservers`, `dns_timeout`, `ip_db_path`,
|
||||
`offline`, etc.)
|
||||
- Processing flags (`strip_attachment_payloads`, `batch_size`,
|
||||
`check_timeout`, etc.)
|
||||
- Log level (`debug`, `verbose`, `warnings`, `silent`)
|
||||
|
||||
Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
|
||||
Gmail API, Maildir path) are **not** reloaded — changing those still
|
||||
requires a full restart.
|
||||
|
||||
On a **successful** reload, existing output client connections are
|
||||
closed and new ones are created from the updated configuration. The
|
||||
service then resumes watching with the new settings.
|
||||
|
||||
If the new configuration file contains errors (missing required
|
||||
settings, unreachable output destinations, etc.), the **entire reload
|
||||
is aborted** — no output clients are replaced and the previous
|
||||
configuration remains fully active. This means a typo in one section
|
||||
will not take down an otherwise working setup. Check the logs for
|
||||
details:
|
||||
|
||||
```bash
|
||||
journalctl -u parsedmarc.service -r
|
||||
```
|
||||
|
||||
To check the status of the service, run:
|
||||
|
||||
```bash
|
||||
|
||||
28
opensearch/opensearch_dashboards.ndjson
Normal file
28
opensearch/opensearch_dashboards.ndjson
Normal file
File diff suppressed because one or more lines are too long
@@ -1957,7 +1957,7 @@ def get_dmarc_reports_from_mailbox(
|
||||
elif isinstance(connection, MSGraphConnection):
|
||||
since = (
|
||||
datetime.now(timezone.utc) - timedelta(minutes=_since)
|
||||
).isoformat() + "Z"
|
||||
).isoformat()
|
||||
current_time = datetime.now(timezone.utc).isoformat() + "Z"
|
||||
elif isinstance(connection, GmailConnection):
|
||||
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime(
|
||||
@@ -2195,6 +2195,7 @@ def watch_inbox(
|
||||
batch_size: int = 10,
|
||||
since: Optional[Union[datetime, date, str]] = None,
|
||||
normalize_timespan_threshold_hours: float = 24,
|
||||
config_reloading: Optional[Callable] = None,
|
||||
):
|
||||
"""
|
||||
Watches the mailbox for new messages and
|
||||
@@ -2222,6 +2223,8 @@ def watch_inbox(
|
||||
batch_size (int): Number of messages to read and process before saving
|
||||
since: Search for messages since certain time
|
||||
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
|
||||
config_reloading: Optional callable that returns True when a config
|
||||
reload has been requested (e.g. via SIGHUP)
|
||||
"""
|
||||
|
||||
def check_callback(connection):
|
||||
@@ -2246,7 +2249,14 @@ def watch_inbox(
|
||||
)
|
||||
callback(res)
|
||||
|
||||
mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout)
|
||||
watch_kwargs: dict = {
|
||||
"check_callback": check_callback,
|
||||
"check_timeout": check_timeout,
|
||||
}
|
||||
if config_reloading is not None:
|
||||
watch_kwargs["config_reloading"] = config_reloading
|
||||
|
||||
mailbox_connection.watch(**watch_kwargs)
|
||||
|
||||
|
||||
def append_json(
|
||||
|
||||
2103
parsedmarc/cli.py
2103
parsedmarc/cli.py
File diff suppressed because it is too large
Load Diff
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.2.1"
|
||||
__version__ = "9.5.1"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -268,6 +268,7 @@ def set_hosts(
|
||||
*,
|
||||
use_ssl: bool = False,
|
||||
ssl_cert_path: Optional[str] = None,
|
||||
skip_certificate_verification: bool = False,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
api_key: Optional[str] = None,
|
||||
@@ -280,6 +281,7 @@ def set_hosts(
|
||||
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
|
||||
use_ssl (bool): Use an HTTPS connection to the server
|
||||
ssl_cert_path (str): Path to the certificate chain
|
||||
skip_certificate_verification (bool): Skip certificate verification
|
||||
username (str): The username to use for authentication
|
||||
password (str): The password to use for authentication
|
||||
api_key (str): The Base64 encoded API key to use for authentication
|
||||
@@ -291,10 +293,11 @@ def set_hosts(
|
||||
if use_ssl:
|
||||
conn_params["use_ssl"] = True
|
||||
if ssl_cert_path:
|
||||
conn_params["verify_certs"] = True
|
||||
conn_params["ca_certs"] = ssl_cert_path
|
||||
else:
|
||||
if skip_certificate_verification:
|
||||
conn_params["verify_certs"] = False
|
||||
else:
|
||||
conn_params["verify_certs"] = True
|
||||
if username and password:
|
||||
conn_params["http_auth"] = username + ":" + password
|
||||
if api_key:
|
||||
@@ -735,6 +738,7 @@ def save_smtp_tls_report_to_elasticsearch(
|
||||
index_date = begin_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = begin_date.strftime("%Y-%m-%d")
|
||||
report = report.copy()
|
||||
report["begin_date"] = begin_date
|
||||
report["end_date"] = end_date
|
||||
|
||||
|
||||
@@ -3,9 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
|
||||
|
||||
@@ -14,6 +12,7 @@ from parsedmarc import (
|
||||
parsed_forensic_reports_to_csv_rows,
|
||||
parsed_smtp_tls_reports_to_csv_rows,
|
||||
)
|
||||
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
|
||||
|
||||
log_context_data = threading.local()
|
||||
|
||||
@@ -37,7 +36,7 @@ class GelfClient(object):
|
||||
"""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.logger = logging.getLogger("parsedmarc_syslog")
|
||||
self.logger = logging.getLogger("parsedmarc_gelf")
|
||||
self.logger.setLevel(logging.INFO)
|
||||
self.logger.addFilter(ContextFilter())
|
||||
self.gelf_mode = {
|
||||
@@ -50,7 +49,7 @@ class GelfClient(object):
|
||||
)
|
||||
self.logger.addHandler(self.handler)
|
||||
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
|
||||
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
@@ -58,14 +57,19 @@ class GelfClient(object):
|
||||
|
||||
log_context_data.parsedmarc = None
|
||||
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
|
||||
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
self.logger.info("parsedmarc forensic report")
|
||||
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
|
||||
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
self.logger.info("parsedmarc smtptls report")
|
||||
|
||||
def close(self):
|
||||
"""Remove and close the GELF handler, releasing its connection."""
|
||||
self.logger.removeHandler(self.handler)
|
||||
self.handler.close()
|
||||
|
||||
@@ -62,6 +62,10 @@ class KafkaClient(object):
|
||||
except NoBrokersAvailable:
|
||||
raise KafkaError("No Kafka brokers available")
|
||||
|
||||
def close(self):
|
||||
"""Close the Kafka producer, releasing background threads and sockets."""
|
||||
self.producer.close()
|
||||
|
||||
@staticmethod
|
||||
def strip_metadata(report: dict[str, Any]):
|
||||
"""
|
||||
|
||||
@@ -175,10 +175,14 @@ class GmailConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
|
||||
@@ -278,10 +278,14 @@ class MSGraphConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
|
||||
@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
self._client.noop()
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""
|
||||
Use an IDLE IMAP connection to parse incoming emails,
|
||||
and pass the results to a callback function
|
||||
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
|
||||
check_callback(self)
|
||||
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
IMAPClient(
|
||||
host=self._client.host,
|
||||
@@ -111,3 +113,5 @@ class IMAPConnection(MailboxConnection):
|
||||
except Exception as e:
|
||||
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
|
||||
@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
|
||||
def keepalive(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -63,10 +63,14 @@ class MaildirConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
return
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
check_callback(self)
|
||||
except Exception as e:
|
||||
logger.warning("Maildir init error. {0}".format(e))
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
|
||||
@@ -271,6 +271,7 @@ def set_hosts(
|
||||
*,
|
||||
use_ssl: Optional[bool] = False,
|
||||
ssl_cert_path: Optional[str] = None,
|
||||
skip_certificate_verification: bool = False,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
api_key: Optional[str] = None,
|
||||
@@ -286,6 +287,7 @@ def set_hosts(
|
||||
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
|
||||
use_ssl (bool): Use an HTTPS connection to the server
|
||||
ssl_cert_path (str): Path to the certificate chain
|
||||
skip_certificate_verification (bool): Skip certificate verification
|
||||
username (str): The username to use for authentication
|
||||
password (str): The password to use for authentication
|
||||
api_key (str): The Base64 encoded API key to use for authentication
|
||||
@@ -300,10 +302,11 @@ def set_hosts(
|
||||
if use_ssl:
|
||||
conn_params["use_ssl"] = True
|
||||
if ssl_cert_path:
|
||||
conn_params["verify_certs"] = True
|
||||
conn_params["ca_certs"] = ssl_cert_path
|
||||
else:
|
||||
if skip_certificate_verification:
|
||||
conn_params["verify_certs"] = False
|
||||
else:
|
||||
conn_params["verify_certs"] = True
|
||||
normalized_auth_type = (auth_type or "basic").strip().lower()
|
||||
if normalized_auth_type == "awssigv4":
|
||||
if not aws_region:
|
||||
@@ -764,6 +767,7 @@ def save_smtp_tls_report_to_opensearch(
|
||||
index_date = begin_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = begin_date.strftime("%Y-%m-%d")
|
||||
report = report.copy()
|
||||
report["begin_date"] = begin_date
|
||||
report["end_date"] = end_date
|
||||
|
||||
|
||||
@@ -93,3 +93,11 @@ class S3Client(object):
|
||||
self.bucket.put_object(
|
||||
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Clean up the boto3 resource."""
|
||||
try:
|
||||
if self.s3.meta is not None:
|
||||
self.s3.meta.client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -58,7 +58,7 @@ class HECClient(object):
|
||||
self.source = source
|
||||
self.session = requests.Session()
|
||||
self.timeout = timeout
|
||||
self.session.verify = verify
|
||||
self.verify = verify
|
||||
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
|
||||
host=self.host, source=self.source, index=self.index
|
||||
)
|
||||
@@ -124,10 +124,12 @@ class HECClient(object):
|
||||
data["event"] = new_report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
@@ -161,10 +163,12 @@ class HECClient(object):
|
||||
data["event"] = report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
@@ -198,12 +202,18 @@ class HECClient(object):
|
||||
data["event"] = report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
if response["code"] != 0:
|
||||
raise SplunkError(response["text"])
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying HTTP session."""
|
||||
self.session.close()
|
||||
|
||||
@@ -57,7 +57,7 @@ class SyslogClient(object):
|
||||
self.logger.setLevel(logging.INFO)
|
||||
|
||||
# Create the appropriate syslog handler based on protocol
|
||||
log_handler = self._create_syslog_handler(
|
||||
self.log_handler = self._create_syslog_handler(
|
||||
server_name,
|
||||
server_port,
|
||||
self.protocol,
|
||||
@@ -69,7 +69,7 @@ class SyslogClient(object):
|
||||
retry_delay,
|
||||
)
|
||||
|
||||
self.logger.addHandler(log_handler)
|
||||
self.logger.addHandler(self.log_handler)
|
||||
|
||||
def _create_syslog_handler(
|
||||
self,
|
||||
@@ -179,3 +179,8 @@ class SyslogClient(object):
|
||||
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
|
||||
for row in rows:
|
||||
self.logger.info(json.dumps(row))
|
||||
|
||||
def close(self):
|
||||
"""Remove and close the syslog handler, releasing its socket."""
|
||||
self.logger.removeHandler(self.log_handler)
|
||||
self.log_handler.close()
|
||||
|
||||
@@ -335,6 +335,76 @@ def get_ip_address_country(
|
||||
return country
|
||||
|
||||
|
||||
def load_reverse_dns_map(
|
||||
reverse_dns_map: ReverseDNSMap,
|
||||
*,
|
||||
always_use_local_file: bool = False,
|
||||
local_file_path: Optional[str] = None,
|
||||
url: Optional[str] = None,
|
||||
offline: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Loads the reverse DNS map from a URL or local file.
|
||||
|
||||
Clears and repopulates the given map dict in place. If the map is
|
||||
fetched from a URL, that is tried first; on failure (or if offline/local
|
||||
mode is selected) the bundled CSV is used as a fallback.
|
||||
|
||||
Args:
|
||||
reverse_dns_map (dict): The map dict to populate (modified in place)
|
||||
always_use_local_file (bool): Always use a local map file
|
||||
local_file_path (str): Path to a local map file
|
||||
url (str): URL to a reverse DNS map
|
||||
offline (bool): Use the built-in copy of the reverse DNS map
|
||||
"""
|
||||
if url is None:
|
||||
url = (
|
||||
"https://raw.githubusercontent.com/domainaware"
|
||||
"/parsedmarc/master/parsedmarc/"
|
||||
"resources/maps/base_reverse_dns_map.csv"
|
||||
)
|
||||
|
||||
reverse_dns_map.clear()
|
||||
|
||||
def load_csv(_csv_file):
|
||||
reader = csv.DictReader(_csv_file)
|
||||
for row in reader:
|
||||
key = row["base_reverse_dns"].lower().strip()
|
||||
reverse_dns_map[key] = {
|
||||
"name": row["name"].strip(),
|
||||
"type": row["type"].strip(),
|
||||
}
|
||||
|
||||
csv_file = io.StringIO()
|
||||
|
||||
if not (offline or always_use_local_file):
|
||||
try:
|
||||
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
|
||||
headers = {"User-Agent": USER_AGENT}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
csv_file.write(response.text)
|
||||
csv_file.seek(0)
|
||||
load_csv(csv_file)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Failed to fetch reverse DNS map: {e}")
|
||||
except Exception:
|
||||
logger.warning("Not a valid CSV file")
|
||||
csv_file.seek(0)
|
||||
logging.debug("Response body:")
|
||||
logger.debug(csv_file.read())
|
||||
|
||||
if len(reverse_dns_map) == 0:
|
||||
logger.info("Loading included reverse DNS map...")
|
||||
path = str(
|
||||
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
|
||||
)
|
||||
if local_file_path is not None:
|
||||
path = local_file_path
|
||||
with open(path) as csv_file:
|
||||
load_csv(csv_file)
|
||||
|
||||
|
||||
def get_service_from_reverse_dns_base_domain(
|
||||
base_domain,
|
||||
*,
|
||||
@@ -361,55 +431,21 @@ def get_service_from_reverse_dns_base_domain(
|
||||
"""
|
||||
|
||||
base_domain = base_domain.lower().strip()
|
||||
if url is None:
|
||||
url = (
|
||||
"https://raw.githubusercontent.com/domainaware"
|
||||
"/parsedmarc/master/parsedmarc/"
|
||||
"resources/maps/base_reverse_dns_map.csv"
|
||||
)
|
||||
reverse_dns_map_value: ReverseDNSMap
|
||||
if reverse_dns_map is None:
|
||||
reverse_dns_map_value = {}
|
||||
else:
|
||||
reverse_dns_map_value = reverse_dns_map
|
||||
|
||||
def load_csv(_csv_file):
|
||||
reader = csv.DictReader(_csv_file)
|
||||
for row in reader:
|
||||
key = row["base_reverse_dns"].lower().strip()
|
||||
reverse_dns_map_value[key] = {
|
||||
"name": row["name"],
|
||||
"type": row["type"],
|
||||
}
|
||||
|
||||
csv_file = io.StringIO()
|
||||
|
||||
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
|
||||
try:
|
||||
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
|
||||
headers = {"User-Agent": USER_AGENT}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
csv_file.write(response.text)
|
||||
csv_file.seek(0)
|
||||
load_csv(csv_file)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Failed to fetch reverse DNS map: {e}")
|
||||
except Exception:
|
||||
logger.warning("Not a valid CSV file")
|
||||
csv_file.seek(0)
|
||||
logging.debug("Response body:")
|
||||
logger.debug(csv_file.read())
|
||||
|
||||
if len(reverse_dns_map_value) == 0:
|
||||
logger.info("Loading included reverse DNS map...")
|
||||
path = str(
|
||||
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
|
||||
load_reverse_dns_map(
|
||||
reverse_dns_map_value,
|
||||
always_use_local_file=always_use_local_file,
|
||||
local_file_path=local_file_path,
|
||||
url=url,
|
||||
offline=offline,
|
||||
)
|
||||
if local_file_path is not None:
|
||||
path = local_file_path
|
||||
with open(path) as csv_file:
|
||||
load_csv(csv_file)
|
||||
|
||||
service: ReverseDNSService
|
||||
try:
|
||||
service = reverse_dns_map_value[base_domain]
|
||||
|
||||
@@ -63,3 +63,7 @@ class WebhookClient(object):
|
||||
self.session.post(webhook_url, data=payload, timeout=self.timeout)
|
||||
except Exception as error_:
|
||||
logger.error("Webhook Error: {0}".format(error_.__str__()))
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying HTTP session."""
|
||||
self.session.close()
|
||||
|
||||
@@ -50,7 +50,7 @@ dependencies = [
|
||||
"lxml>=4.4.0",
|
||||
"mailsuite>=1.11.2",
|
||||
"msgraph-core==0.2.2",
|
||||
"opensearch-py>=2.4.2,<=3.0.0",
|
||||
"opensearch-py>=2.4.2,<=4.0.0",
|
||||
"publicsuffixlist>=0.10.0",
|
||||
"pygelf>=0.4.2",
|
||||
"requests>=2.22.0",
|
||||
|
||||
821
tests.py
821
tests.py
@@ -3,18 +3,23 @@
|
||||
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from base64 import urlsafe_b64encode
|
||||
from configparser import ConfigParser
|
||||
from glob import glob
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
from typing import cast
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from lxml import etree
|
||||
from lxml import etree # type: ignore[import-untyped]
|
||||
from googleapiclient.errors import HttpError
|
||||
from httplib2 import Response
|
||||
from imapclient.exceptions import IMAPClientError
|
||||
@@ -31,6 +36,7 @@ from parsedmarc.mail.imap import IMAPConnection
|
||||
import parsedmarc.mail.gmail as gmail_module
|
||||
import parsedmarc.mail.graph as graph_module
|
||||
import parsedmarc.mail.imap as imap_module
|
||||
import parsedmarc.elastic
|
||||
import parsedmarc.opensearch as opensearch_module
|
||||
import parsedmarc.utils
|
||||
|
||||
@@ -153,7 +159,7 @@ class Test(unittest.TestCase):
|
||||
report_path,
|
||||
offline=True,
|
||||
)
|
||||
self.assertEqual(result["report_type"], "aggregate")
|
||||
assert result["report_type"] == "aggregate"
|
||||
self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com")
|
||||
|
||||
def testParseReportFileAcceptsPathForEmail(self):
|
||||
@@ -164,7 +170,7 @@ class Test(unittest.TestCase):
|
||||
report_path,
|
||||
offline=True,
|
||||
)
|
||||
self.assertEqual(result["report_type"], "aggregate")
|
||||
assert result["report_type"] == "aggregate"
|
||||
self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com")
|
||||
|
||||
def testAggregateSamples(self):
|
||||
@@ -175,10 +181,11 @@ class Test(unittest.TestCase):
|
||||
if os.path.isdir(sample_path):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
result = parsedmarc.parse_report_file(
|
||||
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
|
||||
)
|
||||
assert result["report_type"] == "aggregate"
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(result["report"])
|
||||
print("Passed!")
|
||||
|
||||
def testEmptySample(self):
|
||||
@@ -194,13 +201,13 @@ class Test(unittest.TestCase):
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
with open(sample_path) as sample_file:
|
||||
sample_content = sample_file.read()
|
||||
parsed_report = parsedmarc.parse_report_email(
|
||||
email_result = parsedmarc.parse_report_email(
|
||||
sample_content, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
|
||||
)
|
||||
assert email_result["report_type"] == "forensic"
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "forensic"
|
||||
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
|
||||
print("Passed!")
|
||||
|
||||
def testSmtpTlsSamples(self):
|
||||
@@ -211,10 +218,9 @@ class Test(unittest.TestCase):
|
||||
if os.path.isdir(sample_path):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "smtp_tls"
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
|
||||
print("Passed!")
|
||||
|
||||
def testOpenSearchSigV4RequiresRegion(self):
|
||||
@@ -1274,10 +1280,26 @@ class TestImapFallbacks(unittest.TestCase):
|
||||
|
||||
|
||||
class TestMailboxWatchSince(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = True
|
||||
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
||||
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
||||
self._stdout_patch.start()
|
||||
self._stderr_patch.start()
|
||||
|
||||
def tearDown(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = False
|
||||
self._stderr_patch.stop()
|
||||
self._stdout_patch.stop()
|
||||
|
||||
def testWatchInboxPassesSinceToMailboxFetch(self):
|
||||
mailbox_connection = SimpleNamespace()
|
||||
|
||||
def fake_watch(check_callback, check_timeout):
|
||||
def fake_watch(check_callback, check_timeout, config_reloading=None):
|
||||
check_callback(mailbox_connection)
|
||||
raise _BreakLoop()
|
||||
|
||||
@@ -1288,7 +1310,9 @@ class TestMailboxWatchSince(unittest.TestCase):
|
||||
) as mocked:
|
||||
with self.assertRaises(_BreakLoop):
|
||||
parsedmarc.watch_inbox(
|
||||
mailbox_connection=mailbox_connection,
|
||||
mailbox_connection=cast(
|
||||
parsedmarc.MailboxConnection, mailbox_connection
|
||||
),
|
||||
callback=callback,
|
||||
check_timeout=1,
|
||||
batch_size=10,
|
||||
@@ -1336,34 +1360,50 @@ since = 2d
|
||||
self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d")
|
||||
|
||||
|
||||
class _DummyMailboxConnection:
|
||||
class _DummyMailboxConnection(parsedmarc.MailboxConnection):
|
||||
def __init__(self):
|
||||
self.fetch_calls = []
|
||||
self.fetch_calls: list[dict[str, object]] = []
|
||||
|
||||
def create_folder(self, folder_name):
|
||||
def create_folder(self, folder_name: str):
|
||||
return None
|
||||
|
||||
def fetch_messages(self, reports_folder, **kwargs):
|
||||
def fetch_messages(self, reports_folder: str, **kwargs):
|
||||
self.fetch_calls.append({"reports_folder": reports_folder, **kwargs})
|
||||
return []
|
||||
|
||||
def fetch_message(self, message_id, **kwargs):
|
||||
def fetch_message(self, message_id) -> str:
|
||||
return ""
|
||||
|
||||
def delete_message(self, message_id):
|
||||
return None
|
||||
|
||||
def move_message(self, message_id, folder_name):
|
||||
def move_message(self, message_id, folder_name: str):
|
||||
return None
|
||||
|
||||
def keepalive(self):
|
||||
return None
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
return None
|
||||
|
||||
|
||||
class TestMailboxPerformance(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = True
|
||||
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
||||
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
||||
self._stdout_patch.start()
|
||||
self._stderr_patch.start()
|
||||
|
||||
def tearDown(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = False
|
||||
self._stderr_patch.stop()
|
||||
self._stdout_patch.stop()
|
||||
|
||||
def testBatchModeAvoidsExtraFullFetch(self):
|
||||
connection = _DummyMailboxConnection()
|
||||
parsedmarc.get_dmarc_reports_from_mailbox(
|
||||
@@ -1558,7 +1598,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
||||
def testWellKnownFolderFallback(self):
|
||||
connection = MSGraphConnection.__new__(MSGraphConnection)
|
||||
connection.mailbox_name = "shared@example.com"
|
||||
connection._client = _FakeGraphClient()
|
||||
connection._client = _FakeGraphClient() # type: ignore[assignment]
|
||||
connection._request_with_retries = MagicMock(
|
||||
side_effect=lambda method_name, *args, **kwargs: getattr(
|
||||
connection._client, method_name
|
||||
@@ -1578,7 +1618,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
||||
def testUnknownFolderStillFails(self):
|
||||
connection = MSGraphConnection.__new__(MSGraphConnection)
|
||||
connection.mailbox_name = "shared@example.com"
|
||||
connection._client = _FakeGraphClient()
|
||||
connection._client = _FakeGraphClient() # type: ignore[assignment]
|
||||
connection._request_with_retries = MagicMock(
|
||||
side_effect=lambda method_name, *args, **kwargs: getattr(
|
||||
connection._client, method_name
|
||||
@@ -1910,5 +1950,732 @@ certificate_path = /tmp/msgraph-cert.pem
|
||||
mock_get_mailbox_reports.assert_not_called()
|
||||
|
||||
|
||||
class TestSighupReload(unittest.TestCase):
|
||||
"""Tests for SIGHUP-driven configuration reload in watch mode."""
|
||||
|
||||
def setUp(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = True
|
||||
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
||||
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
||||
self._stdout_patch.start()
|
||||
self._stderr_patch.start()
|
||||
|
||||
def tearDown(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = False
|
||||
self._stderr_patch.stop()
|
||||
self._stdout_patch.stop()
|
||||
|
||||
_BASE_CONFIG = """[general]
|
||||
silent = true
|
||||
|
||||
[imap]
|
||||
host = imap.example.com
|
||||
user = user
|
||||
password = pass
|
||||
|
||||
[mailbox]
|
||||
watch = true
|
||||
"""
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config")
|
||||
@patch("parsedmarc.cli._load_config")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
def testSighupTriggersReloadAndWatchRestarts(
|
||||
self,
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_load_config,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
"""SIGHUP causes watch to return, config is re-parsed, and watch restarts."""
|
||||
import signal as signal_module
|
||||
|
||||
mock_imap.return_value = object()
|
||||
mock_get_reports.return_value = {
|
||||
"aggregate_reports": [],
|
||||
"forensic_reports": [],
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
mock_load_config.return_value = ConfigParser()
|
||||
|
||||
def parse_side_effect(config, opts):
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
opts.imap_password = "pass"
|
||||
opts.mailbox_watch = True
|
||||
return None
|
||||
|
||||
mock_parse_config.side_effect = parse_side_effect
|
||||
mock_init_clients.return_value = {}
|
||||
|
||||
call_count = [0]
|
||||
|
||||
def watch_side_effect(*args, **kwargs):
|
||||
call_count[0] += 1
|
||||
if call_count[0] == 1:
|
||||
# Simulate SIGHUP arriving while watch is running
|
||||
if hasattr(signal_module, "SIGHUP"):
|
||||
import os
|
||||
|
||||
os.kill(os.getpid(), signal_module.SIGHUP)
|
||||
return # Normal return — reload loop will continue
|
||||
else:
|
||||
raise FileExistsError("stop-watch-loop")
|
||||
|
||||
mock_watch.side_effect = watch_side_effect
|
||||
|
||||
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
||||
cfg.write(self._BASE_CONFIG)
|
||||
cfg_path = cfg.name
|
||||
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
||||
|
||||
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
parsedmarc.cli._main()
|
||||
|
||||
# Exited with code 1 (from FileExistsError handler)
|
||||
self.assertEqual(cm.exception.code, 1)
|
||||
# watch_inbox was called twice: initial run + after reload
|
||||
self.assertEqual(mock_watch.call_count, 2)
|
||||
# _parse_config called for initial load + reload
|
||||
self.assertGreaterEqual(mock_parse_config.call_count, 2)
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config")
|
||||
@patch("parsedmarc.cli._load_config")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
def testInvalidConfigOnReloadKeepsPreviousState(
|
||||
self,
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_load_config,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
"""A failing reload leaves opts and clients unchanged."""
|
||||
import signal as signal_module
|
||||
|
||||
mock_imap.return_value = object()
|
||||
mock_get_reports.return_value = {
|
||||
"aggregate_reports": [],
|
||||
"forensic_reports": [],
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
mock_load_config.return_value = ConfigParser()
|
||||
|
||||
# Initial parse sets required opts; reload parse raises
|
||||
initial_map = {"prefix_": ["example.com"]}
|
||||
call_count = [0]
|
||||
|
||||
def parse_side_effect(config, opts):
|
||||
call_count[0] += 1
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
opts.imap_password = "pass"
|
||||
opts.mailbox_watch = True
|
||||
if call_count[0] == 1:
|
||||
return initial_map
|
||||
raise RuntimeError("bad config")
|
||||
|
||||
mock_parse_config.side_effect = parse_side_effect
|
||||
|
||||
initial_clients = {"s3_client": MagicMock()}
|
||||
mock_init_clients.return_value = initial_clients
|
||||
|
||||
watch_calls = [0]
|
||||
|
||||
def watch_side_effect(*args, **kwargs):
|
||||
watch_calls[0] += 1
|
||||
if watch_calls[0] == 1:
|
||||
if hasattr(signal_module, "SIGHUP"):
|
||||
import os
|
||||
|
||||
os.kill(os.getpid(), signal_module.SIGHUP)
|
||||
return
|
||||
else:
|
||||
raise FileExistsError("stop")
|
||||
|
||||
mock_watch.side_effect = watch_side_effect
|
||||
|
||||
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
||||
cfg.write(self._BASE_CONFIG)
|
||||
cfg_path = cfg.name
|
||||
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
||||
|
||||
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(cm.exception.code, 1)
|
||||
# watch was still called twice (reload loop continued after failed reload)
|
||||
self.assertEqual(mock_watch.call_count, 2)
|
||||
# The failed reload must not have closed the original clients
|
||||
initial_clients["s3_client"].close.assert_not_called()
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config")
|
||||
@patch("parsedmarc.cli._load_config")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
def testReloadClosesOldClients(
|
||||
self,
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_load_config,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
"""Successful reload closes the old output clients before replacing them."""
|
||||
import signal as signal_module
|
||||
|
||||
mock_imap.return_value = object()
|
||||
mock_get_reports.return_value = {
|
||||
"aggregate_reports": [],
|
||||
"forensic_reports": [],
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
mock_load_config.return_value = ConfigParser()
|
||||
|
||||
def parse_side_effect(config, opts):
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
opts.imap_password = "pass"
|
||||
opts.mailbox_watch = True
|
||||
return None
|
||||
|
||||
mock_parse_config.side_effect = parse_side_effect
|
||||
|
||||
old_client = MagicMock()
|
||||
new_client = MagicMock()
|
||||
init_call = [0]
|
||||
|
||||
def init_side_effect(opts):
|
||||
init_call[0] += 1
|
||||
if init_call[0] == 1:
|
||||
return {"kafka_client": old_client}
|
||||
return {"kafka_client": new_client}
|
||||
|
||||
mock_init_clients.side_effect = init_side_effect
|
||||
|
||||
watch_calls = [0]
|
||||
|
||||
def watch_side_effect(*args, **kwargs):
|
||||
watch_calls[0] += 1
|
||||
if watch_calls[0] == 1:
|
||||
if hasattr(signal_module, "SIGHUP"):
|
||||
import os
|
||||
|
||||
os.kill(os.getpid(), signal_module.SIGHUP)
|
||||
return
|
||||
else:
|
||||
raise FileExistsError("stop")
|
||||
|
||||
mock_watch.side_effect = watch_side_effect
|
||||
|
||||
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
||||
cfg.write(self._BASE_CONFIG)
|
||||
cfg_path = cfg.name
|
||||
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
||||
|
||||
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
||||
with self.assertRaises(SystemExit):
|
||||
parsedmarc.cli._main()
|
||||
|
||||
# Old client must have been closed when reload succeeded
|
||||
old_client.close.assert_called_once()
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
def testRemovedConfigSectionTakesEffectOnReload(
|
||||
self,
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_init_clients,
|
||||
):
|
||||
"""Removing a config section on reload resets that option to its default."""
|
||||
import signal as signal_module
|
||||
|
||||
mock_imap.return_value = object()
|
||||
mock_get_reports.return_value = {
|
||||
"aggregate_reports": [],
|
||||
"forensic_reports": [],
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
mock_init_clients.return_value = {}
|
||||
|
||||
# First config sets kafka_hosts (with required topics); second removes it.
|
||||
config_v1 = (
|
||||
self._BASE_CONFIG
|
||||
+ "\n[kafka]\nhosts = kafka.example.com:9092\n"
|
||||
+ "aggregate_topic = dmarc_agg\n"
|
||||
+ "forensic_topic = dmarc_forensic\n"
|
||||
+ "smtp_tls_topic = smtp_tls\n"
|
||||
)
|
||||
config_v2 = self._BASE_CONFIG # no [kafka] section
|
||||
|
||||
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
||||
cfg.write(config_v1)
|
||||
cfg_path = cfg.name
|
||||
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
||||
|
||||
watch_calls = [0]
|
||||
|
||||
def watch_side_effect(*args, **kwargs):
|
||||
watch_calls[0] += 1
|
||||
if watch_calls[0] == 1:
|
||||
# Rewrite config to remove kafka before triggering reload
|
||||
with open(cfg_path, "w") as f:
|
||||
f.write(config_v2)
|
||||
if hasattr(signal_module, "SIGHUP"):
|
||||
import os
|
||||
|
||||
os.kill(os.getpid(), signal_module.SIGHUP)
|
||||
return
|
||||
else:
|
||||
raise FileExistsError("stop")
|
||||
|
||||
mock_watch.side_effect = watch_side_effect
|
||||
|
||||
# Capture opts used on each _init_output_clients call
|
||||
init_opts_captures = []
|
||||
|
||||
def init_side_effect(opts):
|
||||
from argparse import Namespace as NS
|
||||
|
||||
init_opts_captures.append(NS(**vars(opts)))
|
||||
return {}
|
||||
|
||||
mock_init_clients.side_effect = init_side_effect
|
||||
|
||||
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
||||
with self.assertRaises(SystemExit):
|
||||
parsedmarc.cli._main()
|
||||
|
||||
# First init: kafka_hosts should be set from v1 config
|
||||
self.assertIsNotNone(init_opts_captures[0].kafka_hosts)
|
||||
# Second init (after reload with v2 config): kafka_hosts should be None
|
||||
self.assertIsNone(init_opts_captures[1].kafka_hosts)
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config")
|
||||
@patch("parsedmarc.cli._load_config")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
def testReloadRefreshesReverseDnsMap(
|
||||
self,
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_load_config,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
"""SIGHUP reload repopulates the reverse DNS map so lookups still work."""
|
||||
import signal as signal_module
|
||||
|
||||
from parsedmarc import REVERSE_DNS_MAP
|
||||
|
||||
mock_imap.return_value = object()
|
||||
mock_get_reports.return_value = {
|
||||
"aggregate_reports": [],
|
||||
"forensic_reports": [],
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
mock_load_config.return_value = ConfigParser()
|
||||
|
||||
def parse_side_effect(config, opts):
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
opts.imap_password = "pass"
|
||||
opts.mailbox_watch = True
|
||||
return None
|
||||
|
||||
mock_parse_config.side_effect = parse_side_effect
|
||||
mock_init_clients.return_value = {}
|
||||
|
||||
# Snapshot the map state after each watch_inbox call
|
||||
map_snapshots = []
|
||||
|
||||
watch_calls = [0]
|
||||
|
||||
def watch_side_effect(*args, **kwargs):
|
||||
watch_calls[0] += 1
|
||||
if watch_calls[0] == 1:
|
||||
if hasattr(signal_module, "SIGHUP"):
|
||||
import os
|
||||
|
||||
os.kill(os.getpid(), signal_module.SIGHUP)
|
||||
return
|
||||
else:
|
||||
# Capture the map state after reload, before we stop the loop
|
||||
map_snapshots.append(dict(REVERSE_DNS_MAP))
|
||||
raise FileExistsError("stop")
|
||||
|
||||
mock_watch.side_effect = watch_side_effect
|
||||
|
||||
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
||||
cfg.write(self._BASE_CONFIG)
|
||||
cfg_path = cfg.name
|
||||
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
||||
|
||||
# Pre-populate the map so we can verify it gets refreshed
|
||||
REVERSE_DNS_MAP.clear()
|
||||
REVERSE_DNS_MAP["stale.example.com"] = {
|
||||
"name": "Stale",
|
||||
"type": "stale",
|
||||
}
|
||||
original_contents = dict(REVERSE_DNS_MAP)
|
||||
|
||||
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
||||
with self.assertRaises(SystemExit):
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(mock_watch.call_count, 2)
|
||||
# The map should have been repopulated (not empty, not the stale data)
|
||||
self.assertEqual(len(map_snapshots), 1)
|
||||
refreshed = map_snapshots[0]
|
||||
self.assertGreater(len(refreshed), 0, "Map should not be empty after reload")
|
||||
self.assertNotEqual(
|
||||
refreshed,
|
||||
original_contents,
|
||||
"Map should have been refreshed, not kept stale data",
|
||||
)
|
||||
self.assertNotIn(
|
||||
"stale.example.com",
|
||||
refreshed,
|
||||
"Stale entry should have been cleared by reload",
|
||||
)
|
||||
|
||||
|
||||
class TestIndexPrefixDomainMapTlsFiltering(unittest.TestCase):
|
||||
"""Tests that SMTP TLS reports for unmapped domains are filtered out
|
||||
when index_prefix_domain_map is configured."""
|
||||
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
def testTlsReportsFilteredByDomainMap(
|
||||
self,
|
||||
mock_imap_connection,
|
||||
mock_get_reports,
|
||||
):
|
||||
"""TLS reports for domains not in the map should be silently dropped."""
|
||||
mock_imap_connection.return_value = object()
|
||||
mock_get_reports.return_value = {
|
||||
"aggregate_reports": [],
|
||||
"forensic_reports": [],
|
||||
"smtp_tls_reports": [
|
||||
{
|
||||
"organization_name": "Allowed Org",
|
||||
"begin_date": "2024-01-01T00:00:00Z",
|
||||
"end_date": "2024-01-01T23:59:59Z",
|
||||
"report_id": "allowed-1",
|
||||
"contact_info": "tls@allowed.example.com",
|
||||
"policies": [
|
||||
{
|
||||
"policy_domain": "allowed.example.com",
|
||||
"policy_type": "sts",
|
||||
"successful_session_count": 1,
|
||||
"failed_session_count": 0,
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"organization_name": "Unmapped Org",
|
||||
"begin_date": "2024-01-01T00:00:00Z",
|
||||
"end_date": "2024-01-01T23:59:59Z",
|
||||
"report_id": "unmapped-1",
|
||||
"contact_info": "tls@unmapped.example.net",
|
||||
"policies": [
|
||||
{
|
||||
"policy_domain": "unmapped.example.net",
|
||||
"policy_type": "sts",
|
||||
"successful_session_count": 5,
|
||||
"failed_session_count": 0,
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"organization_name": "Mixed Case Org",
|
||||
"begin_date": "2024-01-01T00:00:00Z",
|
||||
"end_date": "2024-01-01T23:59:59Z",
|
||||
"report_id": "mixed-case-1",
|
||||
"contact_info": "tls@mixedcase.example.com",
|
||||
"policies": [
|
||||
{
|
||||
"policy_domain": "MixedCase.Example.Com",
|
||||
"policy_type": "sts",
|
||||
"successful_session_count": 2,
|
||||
"failed_session_count": 0,
|
||||
}
|
||||
],
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
domain_map = {"tenant_a": ["example.com"]}
|
||||
with NamedTemporaryFile("w", suffix=".yaml", delete=False) as map_file:
|
||||
import yaml
|
||||
|
||||
yaml.dump(domain_map, map_file)
|
||||
map_path = map_file.name
|
||||
self.addCleanup(lambda: os.path.exists(map_path) and os.remove(map_path))
|
||||
|
||||
config = f"""[general]
|
||||
save_smtp_tls = true
|
||||
silent = false
|
||||
index_prefix_domain_map = {map_path}
|
||||
|
||||
[imap]
|
||||
host = imap.example.com
|
||||
user = test-user
|
||||
password = test-password
|
||||
"""
|
||||
with NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
|
||||
config_file.write(config)
|
||||
config_path = config_file.name
|
||||
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
||||
|
||||
captured = io.StringIO()
|
||||
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
||||
with patch("sys.stdout", captured):
|
||||
parsedmarc.cli._main()
|
||||
|
||||
output = json.loads(captured.getvalue())
|
||||
tls_reports = output["smtp_tls_reports"]
|
||||
self.assertEqual(len(tls_reports), 2)
|
||||
report_ids = {r["report_id"] for r in tls_reports}
|
||||
self.assertIn("allowed-1", report_ids)
|
||||
self.assertIn("mixed-case-1", report_ids)
|
||||
self.assertNotIn("unmapped-1", report_ids)
|
||||
|
||||
|
||||
class TestEnvVarConfig(unittest.TestCase):
|
||||
"""Tests for environment variable configuration support."""
|
||||
|
||||
def test_resolve_section_key_simple(self):
|
||||
"""Simple section names resolve correctly."""
|
||||
from parsedmarc.cli import _resolve_section_key
|
||||
|
||||
self.assertEqual(_resolve_section_key("IMAP_PASSWORD"), ("imap", "password"))
|
||||
self.assertEqual(_resolve_section_key("GENERAL_DEBUG"), ("general", "debug"))
|
||||
self.assertEqual(_resolve_section_key("S3_BUCKET"), ("s3", "bucket"))
|
||||
self.assertEqual(_resolve_section_key("GELF_HOST"), ("gelf", "host"))
|
||||
|
||||
def test_resolve_section_key_underscore_sections(self):
|
||||
"""Multi-word section names (splunk_hec, gmail_api, etc.) resolve correctly."""
|
||||
from parsedmarc.cli import _resolve_section_key
|
||||
|
||||
self.assertEqual(
|
||||
_resolve_section_key("SPLUNK_HEC_TOKEN"), ("splunk_hec", "token")
|
||||
)
|
||||
self.assertEqual(
|
||||
_resolve_section_key("GMAIL_API_CREDENTIALS_FILE"),
|
||||
("gmail_api", "credentials_file"),
|
||||
)
|
||||
self.assertEqual(
|
||||
_resolve_section_key("LOG_ANALYTICS_CLIENT_ID"),
|
||||
("log_analytics", "client_id"),
|
||||
)
|
||||
|
||||
def test_resolve_section_key_unknown(self):
|
||||
"""Unknown prefixes return (None, None)."""
|
||||
from parsedmarc.cli import _resolve_section_key
|
||||
|
||||
self.assertEqual(_resolve_section_key("UNKNOWN_FOO"), (None, None))
|
||||
# Just a section name with no key should not match
|
||||
self.assertEqual(_resolve_section_key("IMAP"), (None, None))
|
||||
|
||||
def test_apply_env_overrides_injects_values(self):
|
||||
"""Env vars are injected into an existing ConfigParser."""
|
||||
from configparser import ConfigParser
|
||||
from parsedmarc.cli import _apply_env_overrides
|
||||
|
||||
config = ConfigParser()
|
||||
config.add_section("imap")
|
||||
config.set("imap", "host", "original.example.com")
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_IMAP_HOST": "new.example.com",
|
||||
"PARSEDMARC_IMAP_PASSWORD": "secret123",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
|
||||
self.assertEqual(config.get("imap", "host"), "new.example.com")
|
||||
self.assertEqual(config.get("imap", "password"), "secret123")
|
||||
|
||||
def test_apply_env_overrides_creates_sections(self):
|
||||
"""Env vars create new sections when they don't exist."""
|
||||
from configparser import ConfigParser
|
||||
from parsedmarc.cli import _apply_env_overrides
|
||||
|
||||
config = ConfigParser()
|
||||
|
||||
env = {"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200"}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
|
||||
self.assertTrue(config.has_section("elasticsearch"))
|
||||
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
|
||||
|
||||
def test_apply_env_overrides_ignores_config_file_var(self):
|
||||
"""PARSEDMARC_CONFIG_FILE is not injected as a config key."""
|
||||
from configparser import ConfigParser
|
||||
from parsedmarc.cli import _apply_env_overrides
|
||||
|
||||
config = ConfigParser()
|
||||
|
||||
env = {"PARSEDMARC_CONFIG_FILE": "/some/path.ini"}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
|
||||
self.assertEqual(config.sections(), [])
|
||||
|
||||
def test_load_config_with_file_and_env_override(self):
|
||||
"""Env vars override values from an INI file."""
|
||||
from parsedmarc.cli import _load_config
|
||||
|
||||
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
|
||||
f.write(
|
||||
"[imap]\nhost = file.example.com\nuser = alice\npassword = fromfile\n"
|
||||
)
|
||||
f.flush()
|
||||
config_path = f.name
|
||||
|
||||
try:
|
||||
env = {"PARSEDMARC_IMAP_PASSWORD": "fromenv"}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(config_path)
|
||||
|
||||
self.assertEqual(config.get("imap", "host"), "file.example.com")
|
||||
self.assertEqual(config.get("imap", "user"), "alice")
|
||||
self.assertEqual(config.get("imap", "password"), "fromenv")
|
||||
finally:
|
||||
os.unlink(config_path)
|
||||
|
||||
def test_load_config_env_only(self):
|
||||
"""Config can be loaded purely from env vars with no file."""
|
||||
from parsedmarc.cli import _load_config
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_GENERAL_DEBUG": "true",
|
||||
"PARSEDMARC_ELASTICSEARCH_HOSTS": "http://localhost:9200",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
|
||||
self.assertEqual(config.get("general", "debug"), "true")
|
||||
self.assertEqual(config.get("elasticsearch", "hosts"), "http://localhost:9200")
|
||||
|
||||
def test_parse_config_from_env(self):
|
||||
"""Full round-trip: env vars -> ConfigParser -> opts."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
|
||||
env = {
|
||||
"PARSEDMARC_GENERAL_DEBUG": "true",
|
||||
"PARSEDMARC_GENERAL_SAVE_AGGREGATE": "true",
|
||||
"PARSEDMARC_GENERAL_OFFLINE": "true",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(None)
|
||||
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
|
||||
self.assertTrue(opts.debug)
|
||||
self.assertTrue(opts.save_aggregate)
|
||||
self.assertTrue(opts.offline)
|
||||
|
||||
def test_config_file_env_var(self):
|
||||
"""PARSEDMARC_CONFIG_FILE env var specifies the config file path."""
|
||||
from argparse import Namespace
|
||||
from parsedmarc.cli import _load_config, _parse_config
|
||||
|
||||
with NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
|
||||
f.write("[general]\ndebug = true\noffline = true\n")
|
||||
f.flush()
|
||||
config_path = f.name
|
||||
|
||||
try:
|
||||
env = {"PARSEDMARC_CONFIG_FILE": config_path}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
config = _load_config(os.environ.get("PARSEDMARC_CONFIG_FILE"))
|
||||
|
||||
opts = Namespace()
|
||||
_parse_config(config, opts)
|
||||
self.assertTrue(opts.debug)
|
||||
self.assertTrue(opts.offline)
|
||||
finally:
|
||||
os.unlink(config_path)
|
||||
|
||||
def test_boolean_values_from_env(self):
|
||||
"""Various boolean string representations work through ConfigParser."""
|
||||
from configparser import ConfigParser
|
||||
from parsedmarc.cli import _apply_env_overrides
|
||||
|
||||
for true_val in ("true", "yes", "1", "on", "True", "YES"):
|
||||
config = ConfigParser()
|
||||
env = {"PARSEDMARC_GENERAL_DEBUG": true_val}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
self.assertTrue(
|
||||
config.getboolean("general", "debug"),
|
||||
f"Expected truthy for {true_val!r}",
|
||||
)
|
||||
|
||||
for false_val in ("false", "no", "0", "off", "False", "NO"):
|
||||
config = ConfigParser()
|
||||
env = {"PARSEDMARC_GENERAL_DEBUG": false_val}
|
||||
with patch.dict(os.environ, env, clear=False):
|
||||
_apply_env_overrides(config)
|
||||
self.assertFalse(
|
||||
config.getboolean("general", "debug"),
|
||||
f"Expected falsy for {false_val!r}",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
|
||||
Reference in New Issue
Block a user