mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-26 08:22:45 +00:00
Compare commits
13 Commits
copilot/su
...
9.5.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1542936468 | ||
|
|
fb3c38a8b8 | ||
|
|
c9a6145505 | ||
|
|
e1bdbeb257 | ||
|
|
12c4676b79 | ||
|
|
cda039ee27 | ||
|
|
ff0ca6538c | ||
|
|
2032438d3b | ||
|
|
1e95c5d30b | ||
|
|
cb2384be83 | ||
|
|
9a5b5310fa | ||
|
|
9849598100 | ||
|
|
e82f3e58a1 |
@@ -7,7 +7,8 @@
|
||||
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
|
||||
"Bash(ls tests*)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)"
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
|
||||
"Bash(python -m pytest tests.py --no-header -q)"
|
||||
],
|
||||
"additionalDirectories": [
|
||||
"/tmp"
|
||||
|
||||
2
.vscode/settings.json
vendored
2
.vscode/settings.json
vendored
@@ -52,6 +52,7 @@
|
||||
"geoipupdate",
|
||||
"Geolite",
|
||||
"geolocation",
|
||||
"getuid",
|
||||
"githubpages",
|
||||
"Grafana",
|
||||
"hostnames",
|
||||
@@ -75,6 +76,7 @@
|
||||
"LISTSERV",
|
||||
"loganalytics",
|
||||
"lxml",
|
||||
"Maildir",
|
||||
"mailparser",
|
||||
"mailrelay",
|
||||
"mailsuite",
|
||||
|
||||
@@ -42,7 +42,7 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
|
||||
### Key modules
|
||||
|
||||
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
|
||||
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration
|
||||
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing (`_load_config` + `_parse_config`), output orchestration. Supports configuration via INI files, `PARSEDMARC_{SECTION}_{KEY}` environment variables, or both (env vars override file values).
|
||||
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
|
||||
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
|
||||
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
|
||||
@@ -52,6 +52,10 @@ To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
|
||||
|
||||
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError` → `InvalidDMARCReport` → `InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
|
||||
|
||||
### Configuration
|
||||
|
||||
Config priority: CLI args > env vars > config file > defaults. Env var naming: `PARSEDMARC_{SECTION}_{KEY}` (e.g. `PARSEDMARC_IMAP_PASSWORD`). Section names with underscores use longest-prefix matching (`PARSEDMARC_SPLUNK_HEC_TOKEN` → `[splunk_hec] token`). Some INI keys have short aliases for env var friendliness (e.g. `[maildir] create` for `maildir_create`). File path values are expanded via `os.path.expanduser`/`os.path.expandvars`. Config can be loaded purely from env vars with no file (`PARSEDMARC_CONFIG_FILE` sets the file path).
|
||||
|
||||
### Caching
|
||||
|
||||
IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour (via `ExpiringDict`).
|
||||
@@ -62,3 +66,6 @@ IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour
|
||||
- TypedDict for structured data, type hints throughout
|
||||
- Python ≥3.10 required
|
||||
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`
|
||||
- File path config values must be wrapped with `_expand_path()` in `cli.py`
|
||||
- Maildir UID checks are intentionally relaxed (warn, don't crash) for Docker compatibility
|
||||
- Token file writes must create parent directories before opening for write
|
||||
|
||||
91
CHANGELOG.md
91
CHANGELOG.md
@@ -1,23 +1,96 @@
|
||||
# Changelog
|
||||
|
||||
## 9.5.4
|
||||
|
||||
### Fixed
|
||||
|
||||
- Maildir `fetch_messages` now respects the `reports_folder` argument. Previously it always read from the top-level Maildir, ignoring the configured reports folder. `fetch_message`, `delete_message`, and `move_message` now also operate on the correct active folder.
|
||||
- Config key aliases for env var compatibility: `[maildir] create` and `path` are now accepted as aliases for `maildir_create` and `maildir_path`, and `[msgraph] url` for `graph_url`. This allows natural env var names like `PARSEDMARC_MAILDIR_CREATE` to work without the redundant `PARSEDMARC_MAILDIR_MAILDIR_CREATE`.
|
||||
|
||||
## 9.5.3
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `FileNotFoundError` when using Maildir with Docker volume mounts. Python's `mailbox.Maildir(create=True)` only creates `cur/new/tmp` subdirectories when the top-level directory doesn't exist; Docker volume mounts pre-create the directory as empty, skipping subdirectory creation. parsedmarc now explicitly creates the subdirectories when `maildir_create` is enabled.
|
||||
- Maildir UID mismatch no longer crashes the process. In Docker containers where volume ownership differs from the container UID, parsedmarc now logs a warning instead of raising an exception. Also handles `os.setuid` failures gracefully in containers without `CAP_SETUID`.
|
||||
- Token file writes (MS Graph and Gmail) now create parent directories automatically, preventing `FileNotFoundError` when the token path points to a directory that doesn't yet exist.
|
||||
- File paths from config (`token_file`, `credentials_file`, `cert_path`, `log_file`, `output`, `ip_db_path`, `maildir_path`, syslog cert paths, etc.) now expand `~` and `$VAR` references via `os.path.expanduser`/`os.path.expandvars`.
|
||||
|
||||
## 9.5.2
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `ValueError: invalid interpolation syntax` when config values (from env vars or INI files) contain `%` characters, such as in passwords. Disabled ConfigParser's `%`-based string interpolation.
|
||||
|
||||
## 9.5.1
|
||||
|
||||
### Changes
|
||||
|
||||
- Correct ISO format for MSGraphConnection timestamps (PR #706)
|
||||
|
||||
## 9.5.0
|
||||
|
||||
### Added
|
||||
|
||||
- Environment variable configuration support: any config option can now be set via `PARSEDMARC_{SECTION}_{KEY}` environment variables (e.g. `PARSEDMARC_IMAP_PASSWORD`, `PARSEDMARC_SPLUNK_HEC_TOKEN`). Environment variables override config file values but are overridden by CLI arguments.
|
||||
- `PARSEDMARC_CONFIG_FILE` environment variable to specify the config file path without the `-c` flag.
|
||||
- Env-only mode: parsedmarc can now run without a config file when `PARSEDMARC_*` environment variables are set, enabling fully file-less Docker deployments.
|
||||
- Explicit read permission check on config file, giving a clear error message when the container UID cannot read the file (e.g. `chmod 600` with a UID mismatch).
|
||||
|
||||
## 9.4.0
|
||||
|
||||
### Added
|
||||
|
||||
- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
|
||||
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
|
||||
- Add premade OpenSearch index patterns, visualizations, and dashboards
|
||||
|
||||
### Changed
|
||||
|
||||
- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
|
||||
- Bump OpenSearch support to `< 4`
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
|
||||
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
|
||||
|
||||
## 9.3.1
|
||||
|
||||
### Breaking changes
|
||||
|
||||
- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
|
||||
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
|
||||
|
||||
### Fixed
|
||||
|
||||
- Splunk HEC `skip_certificate_verification` now works correctly
|
||||
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
|
||||
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
|
||||
|
||||
## 9.3.0
|
||||
|
||||
### Added
|
||||
|
||||
- SIGHUP-based configuration reload for watch mode — update output
|
||||
destinations, DNS/GeoIP settings, processing flags, and log level
|
||||
without restarting the service or interrupting in-progress report
|
||||
processing. Use `systemctl reload parsedmarc` when running under
|
||||
systemd.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from
|
||||
`_main()` in `cli.py` to support config reload and reduce code
|
||||
duplication.
|
||||
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
|
||||
- Use `systemctl reload parsedmarc` when running under `systemd`.
|
||||
- On a successful reload, old output clients are closed and recreated.
|
||||
- On a failed reload, the previous configuration remains fully active.
|
||||
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
|
||||
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
|
||||
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
|
||||
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
|
||||
|
||||
## 9.2.1
|
||||
|
||||
### Added
|
||||
|
||||
- Better checking of `msconfig` configuration (PR #695)
|
||||
- Better checking of `msgraph` configuration (PR #695)
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ services:
|
||||
condition: service_healthy
|
||||
|
||||
opensearch-dashboards:
|
||||
image: opensearchproject/opensearch-dashboards:2
|
||||
image: opensearchproject/opensearch-dashboards:3
|
||||
environment:
|
||||
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
|
||||
ports:
|
||||
@@ -27,7 +27,7 @@ services:
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
environment:
|
||||
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
|
||||
- GRAFANA_PASSWORD=${GRAFANA_PASSWORD}
|
||||
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
|
||||
ports:
|
||||
- "127.0.0.1:3000:3000"
|
||||
@@ -41,5 +41,7 @@ services:
|
||||
- SPLUNK_START_ARGS=--accept-license
|
||||
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
|
||||
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
|
||||
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
|
||||
ports:
|
||||
- "127.0.0.1:8000:8000"
|
||||
- "127.0.0.1:8088:8088"
|
||||
|
||||
@@ -273,6 +273,8 @@ The full set of configuration options are:
|
||||
(Default: `True`)
|
||||
- `timeout` - float: Timeout in seconds (Default: 60)
|
||||
- `cert_path` - str: Path to a trusted certificates
|
||||
- `skip_certificate_verification` - bool: Skip certificate
|
||||
verification (not recommended)
|
||||
- `index_suffix` - str: A suffix to apply to the index names
|
||||
- `index_prefix` - str: A prefix to apply to the index names
|
||||
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
|
||||
@@ -300,6 +302,8 @@ The full set of configuration options are:
|
||||
(Default: `True`)
|
||||
- `timeout` - float: Timeout in seconds (Default: 60)
|
||||
- `cert_path` - str: Path to a trusted certificates
|
||||
- `skip_certificate_verification` - bool: Skip certificate
|
||||
verification (not recommended)
|
||||
- `index_suffix` - str: A suffix to apply to the index names
|
||||
- `index_prefix` - str: A prefix to apply to the index names
|
||||
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
|
||||
@@ -527,6 +531,96 @@ PUT _cluster/settings
|
||||
Increasing this value increases resource usage.
|
||||
:::
|
||||
|
||||
## Environment variable configuration
|
||||
|
||||
Any configuration option can be set via environment variables using the
|
||||
naming convention `PARSEDMARC_{SECTION}_{KEY}` (uppercase). This is
|
||||
especially useful for Docker deployments where file permissions make it
|
||||
difficult to use config files for secrets.
|
||||
|
||||
**Priority order:** CLI arguments > environment variables > config file > defaults
|
||||
|
||||
### Examples
|
||||
|
||||
```bash
|
||||
# Set IMAP credentials via env vars
|
||||
export PARSEDMARC_IMAP_HOST=imap.example.com
|
||||
export PARSEDMARC_IMAP_USER=dmarc@example.com
|
||||
export PARSEDMARC_IMAP_PASSWORD=secret
|
||||
|
||||
# Elasticsearch
|
||||
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://localhost:9200
|
||||
export PARSEDMARC_ELASTICSEARCH_SSL=false
|
||||
|
||||
# Splunk HEC (note: section name splunk_hec becomes SPLUNK_HEC)
|
||||
export PARSEDMARC_SPLUNK_HEC_URL=https://splunk.example.com
|
||||
export PARSEDMARC_SPLUNK_HEC_TOKEN=my-hec-token
|
||||
export PARSEDMARC_SPLUNK_HEC_INDEX=email
|
||||
|
||||
# General settings
|
||||
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
|
||||
export PARSEDMARC_GENERAL_DEBUG=true
|
||||
```
|
||||
|
||||
### Specifying the config file via environment variable
|
||||
|
||||
```bash
|
||||
export PARSEDMARC_CONFIG_FILE=/etc/parsedmarc.ini
|
||||
parsedmarc
|
||||
```
|
||||
|
||||
### Running without a config file (env-only mode)
|
||||
|
||||
When no config file is given (neither `-c` flag nor `PARSEDMARC_CONFIG_FILE`),
|
||||
parsedmarc will still pick up any `PARSEDMARC_*` environment variables. This
|
||||
enables fully file-less deployments:
|
||||
|
||||
```bash
|
||||
export PARSEDMARC_GENERAL_SAVE_AGGREGATE=true
|
||||
export PARSEDMARC_GENERAL_OFFLINE=true
|
||||
export PARSEDMARC_ELASTICSEARCH_HOSTS=http://elasticsearch:9200
|
||||
parsedmarc /path/to/reports/*
|
||||
```
|
||||
|
||||
### Docker Compose example
|
||||
|
||||
```yaml
|
||||
services:
|
||||
parsedmarc:
|
||||
image: parsedmarc:latest
|
||||
environment:
|
||||
PARSEDMARC_IMAP_HOST: imap.example.com
|
||||
PARSEDMARC_IMAP_USER: dmarc@example.com
|
||||
PARSEDMARC_IMAP_PASSWORD: ${IMAP_PASSWORD}
|
||||
PARSEDMARC_MAILBOX_WATCH: "true"
|
||||
PARSEDMARC_ELASTICSEARCH_HOSTS: http://elasticsearch:9200
|
||||
PARSEDMARC_GENERAL_SAVE_AGGREGATE: "true"
|
||||
PARSEDMARC_GENERAL_SAVE_FORENSIC: "true"
|
||||
```
|
||||
|
||||
### Section name mapping
|
||||
|
||||
For sections with underscores in the name, the full section name is used:
|
||||
|
||||
| Section | Env var prefix |
|
||||
|------------------|-------------------------------|
|
||||
| `general` | `PARSEDMARC_GENERAL_` |
|
||||
| `mailbox` | `PARSEDMARC_MAILBOX_` |
|
||||
| `imap` | `PARSEDMARC_IMAP_` |
|
||||
| `msgraph` | `PARSEDMARC_MSGRAPH_` |
|
||||
| `elasticsearch` | `PARSEDMARC_ELASTICSEARCH_` |
|
||||
| `opensearch` | `PARSEDMARC_OPENSEARCH_` |
|
||||
| `splunk_hec` | `PARSEDMARC_SPLUNK_HEC_` |
|
||||
| `kafka` | `PARSEDMARC_KAFKA_` |
|
||||
| `smtp` | `PARSEDMARC_SMTP_` |
|
||||
| `s3` | `PARSEDMARC_S3_` |
|
||||
| `syslog` | `PARSEDMARC_SYSLOG_` |
|
||||
| `gmail_api` | `PARSEDMARC_GMAIL_API_` |
|
||||
| `maildir` | `PARSEDMARC_MAILDIR_` |
|
||||
| `log_analytics` | `PARSEDMARC_LOG_ANALYTICS_` |
|
||||
| `gelf` | `PARSEDMARC_GELF_` |
|
||||
| `webhook` | `PARSEDMARC_WEBHOOK_` |
|
||||
|
||||
## Performance tuning
|
||||
|
||||
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
|
||||
@@ -666,8 +760,15 @@ Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
|
||||
Gmail API, Maildir path) are **not** reloaded — changing those still
|
||||
requires a full restart.
|
||||
|
||||
If the new configuration file contains errors, the reload is aborted
|
||||
and the previous configuration remains active. Check the logs for
|
||||
On a **successful** reload, existing output client connections are
|
||||
closed and new ones are created from the updated configuration. The
|
||||
service then resumes watching with the new settings.
|
||||
|
||||
If the new configuration file contains errors (missing required
|
||||
settings, unreachable output destinations, etc.), the **entire reload
|
||||
is aborted** — no output clients are replaced and the previous
|
||||
configuration remains fully active. This means a typo in one section
|
||||
will not take down an otherwise working setup. Check the logs for
|
||||
details:
|
||||
|
||||
```bash
|
||||
|
||||
28
opensearch/opensearch_dashboards.ndjson
Normal file
28
opensearch/opensearch_dashboards.ndjson
Normal file
File diff suppressed because one or more lines are too long
@@ -1955,9 +1955,7 @@ def get_dmarc_reports_from_mailbox(
|
||||
)
|
||||
current_time = datetime.now(timezone.utc).strftime("%d-%b-%Y")
|
||||
elif isinstance(connection, MSGraphConnection):
|
||||
since = (
|
||||
datetime.now(timezone.utc) - timedelta(minutes=_since)
|
||||
).isoformat() + "Z"
|
||||
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).isoformat()
|
||||
current_time = datetime.now(timezone.utc).isoformat() + "Z"
|
||||
elif isinstance(connection, GmailConnection):
|
||||
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime(
|
||||
@@ -2195,7 +2193,7 @@ def watch_inbox(
|
||||
batch_size: int = 10,
|
||||
since: Optional[Union[datetime, date, str]] = None,
|
||||
normalize_timespan_threshold_hours: float = 24,
|
||||
should_reload: Optional[Callable] = None,
|
||||
config_reloading: Optional[Callable] = None,
|
||||
):
|
||||
"""
|
||||
Watches the mailbox for new messages and
|
||||
@@ -2223,7 +2221,7 @@ def watch_inbox(
|
||||
batch_size (int): Number of messages to read and process before saving
|
||||
since: Search for messages since certain time
|
||||
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
|
||||
should_reload: Optional callable that returns True when a config
|
||||
config_reloading: Optional callable that returns True when a config
|
||||
reload has been requested (e.g. via SIGHUP)
|
||||
"""
|
||||
|
||||
@@ -2253,8 +2251,8 @@ def watch_inbox(
|
||||
"check_callback": check_callback,
|
||||
"check_timeout": check_timeout,
|
||||
}
|
||||
if should_reload is not None:
|
||||
watch_kwargs["should_reload"] = should_reload
|
||||
if config_reloading is not None:
|
||||
watch_kwargs["config_reloading"] = config_reloading
|
||||
|
||||
mailbox_connection.watch(**watch_kwargs)
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ import yaml
|
||||
from tqdm import tqdm
|
||||
|
||||
from parsedmarc import (
|
||||
REVERSE_DNS_MAP,
|
||||
SEEN_AGGREGATE_REPORT_IDS,
|
||||
InvalidDMARCReport,
|
||||
ParserError,
|
||||
@@ -48,7 +49,12 @@ from parsedmarc.mail import (
|
||||
)
|
||||
from parsedmarc.mail.graph import AuthMethod
|
||||
from parsedmarc.types import ParsingResults
|
||||
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
|
||||
from parsedmarc.utils import (
|
||||
get_base_domain,
|
||||
get_reverse_dns,
|
||||
is_mbox,
|
||||
load_reverse_dns_map,
|
||||
)
|
||||
|
||||
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
|
||||
# private stdlib attribute and may not exist in type stubs.
|
||||
@@ -69,6 +75,84 @@ def _str_to_list(s):
|
||||
return list(map(lambda i: i.lstrip(), _list))
|
||||
|
||||
|
||||
def _expand_path(p: str) -> str:
|
||||
"""Expand ``~`` and ``$VAR`` references in a file path."""
|
||||
return os.path.expanduser(os.path.expandvars(p))
|
||||
|
||||
|
||||
# All known INI config section names, used for env var resolution.
|
||||
_KNOWN_SECTIONS = frozenset(
|
||||
{
|
||||
"general",
|
||||
"mailbox",
|
||||
"imap",
|
||||
"msgraph",
|
||||
"elasticsearch",
|
||||
"opensearch",
|
||||
"splunk_hec",
|
||||
"kafka",
|
||||
"smtp",
|
||||
"s3",
|
||||
"syslog",
|
||||
"gmail_api",
|
||||
"maildir",
|
||||
"log_analytics",
|
||||
"gelf",
|
||||
"webhook",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _resolve_section_key(suffix: str) -> tuple:
|
||||
"""Resolve an env var suffix like ``IMAP_PASSWORD`` to ``('imap', 'password')``.
|
||||
|
||||
Uses longest-prefix matching against known section names so that
|
||||
multi-word sections like ``splunk_hec`` are handled correctly.
|
||||
|
||||
Returns ``(None, None)`` when no known section matches.
|
||||
"""
|
||||
suffix_lower = suffix.lower()
|
||||
|
||||
best_section = None
|
||||
best_key = None
|
||||
for section in _KNOWN_SECTIONS:
|
||||
section_prefix = section + "_"
|
||||
if suffix_lower.startswith(section_prefix):
|
||||
key = suffix_lower[len(section_prefix) :]
|
||||
if key and (best_section is None or len(section) > len(best_section)):
|
||||
best_section = section
|
||||
best_key = key
|
||||
|
||||
return best_section, best_key
|
||||
|
||||
|
||||
def _apply_env_overrides(config: ConfigParser) -> None:
|
||||
"""Inject ``PARSEDMARC_*`` environment variables into *config*.
|
||||
|
||||
Environment variables matching ``PARSEDMARC_{SECTION}_{KEY}`` override
|
||||
(or create) the corresponding config-file values. Sections are created
|
||||
automatically when they do not yet exist.
|
||||
"""
|
||||
prefix = "PARSEDMARC_"
|
||||
|
||||
for env_key, env_value in os.environ.items():
|
||||
if not env_key.startswith(prefix) or env_key == "PARSEDMARC_CONFIG_FILE":
|
||||
continue
|
||||
|
||||
suffix = env_key[len(prefix) :]
|
||||
section, key = _resolve_section_key(suffix)
|
||||
|
||||
if section is None:
|
||||
logger.debug("Ignoring unrecognized env var: %s", env_key)
|
||||
continue
|
||||
|
||||
if not config.has_section(section):
|
||||
config.add_section(section)
|
||||
|
||||
config.set(section, key, env_value)
|
||||
logger.debug("Config override from env: [%s] %s", section, key)
|
||||
|
||||
|
||||
def _configure_logging(log_level, log_file=None):
|
||||
"""
|
||||
Configure logging for the current process.
|
||||
@@ -172,12 +256,39 @@ class ConfigurationError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _parse_config_file(config_file, opts):
|
||||
"""Parse a config file and update opts in place.
|
||||
def _load_config(config_file: str | None = None) -> ConfigParser:
|
||||
"""Load configuration from an INI file and/or environment variables.
|
||||
|
||||
Args:
|
||||
config_file: Path to the .ini config file
|
||||
opts: Namespace object to update with parsed values
|
||||
config_file: Optional path to an .ini config file.
|
||||
|
||||
Returns:
|
||||
A ``ConfigParser`` populated from the file (if given) and from any
|
||||
``PARSEDMARC_*`` environment variables.
|
||||
|
||||
Raises:
|
||||
ConfigurationError: If *config_file* is given but does not exist.
|
||||
"""
|
||||
config = ConfigParser(interpolation=None)
|
||||
if config_file is not None:
|
||||
abs_path = os.path.abspath(config_file)
|
||||
if not os.path.exists(abs_path):
|
||||
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
|
||||
if not os.access(abs_path, os.R_OK):
|
||||
raise ConfigurationError(
|
||||
"Unable to read {0} — check file permissions".format(abs_path)
|
||||
)
|
||||
config.read(config_file)
|
||||
_apply_env_overrides(config)
|
||||
return config
|
||||
|
||||
|
||||
def _parse_config(config: ConfigParser, opts):
|
||||
"""Apply a loaded ``ConfigParser`` to *opts* in place.
|
||||
|
||||
Args:
|
||||
config: A ``ConfigParser`` (from ``_load_config``).
|
||||
opts: Namespace object to update with parsed values.
|
||||
|
||||
Returns:
|
||||
index_prefix_domain_map or None
|
||||
@@ -185,13 +296,8 @@ def _parse_config_file(config_file, opts):
|
||||
Raises:
|
||||
ConfigurationError: If required settings are missing or invalid.
|
||||
"""
|
||||
abs_path = os.path.abspath(config_file)
|
||||
if not os.path.exists(abs_path):
|
||||
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
|
||||
opts.silent = True
|
||||
config = ConfigParser()
|
||||
index_prefix_domain_map = None
|
||||
config.read(config_file)
|
||||
if "general" in config.sections():
|
||||
general_config = config["general"]
|
||||
if "silent" in general_config:
|
||||
@@ -201,21 +307,8 @@ def _parse_config_file(config_file, opts):
|
||||
"normalize_timespan_threshold_hours"
|
||||
)
|
||||
if "index_prefix_domain_map" in general_config:
|
||||
map_path = general_config["index_prefix_domain_map"]
|
||||
try:
|
||||
with open(map_path) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
except OSError as exc:
|
||||
raise ConfigurationError(
|
||||
"Failed to read index_prefix_domain_map file '{0}': {1}".format(
|
||||
map_path, exc
|
||||
)
|
||||
) from exc
|
||||
except yaml.YAMLError as exc:
|
||||
raise ConfigurationError(
|
||||
"Failed to parse YAML in index_prefix_domain_map "
|
||||
"file '{0}': {1}".format(map_path, exc)
|
||||
) from exc
|
||||
with open(_expand_path(general_config["index_prefix_domain_map"])) as f:
|
||||
index_prefix_domain_map = yaml.safe_load(f)
|
||||
if "offline" in general_config:
|
||||
opts.offline = bool(general_config.getboolean("offline"))
|
||||
if "strip_attachment_payloads" in general_config:
|
||||
@@ -223,7 +316,7 @@ def _parse_config_file(config_file, opts):
|
||||
general_config.getboolean("strip_attachment_payloads")
|
||||
)
|
||||
if "output" in general_config:
|
||||
opts.output = general_config["output"]
|
||||
opts.output = _expand_path(general_config["output"])
|
||||
if "aggregate_json_filename" in general_config:
|
||||
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
|
||||
if "forensic_json_filename" in general_config:
|
||||
@@ -279,11 +372,11 @@ def _parse_config_file(config_file, opts):
|
||||
general_config.getboolean("fail_on_output_error")
|
||||
)
|
||||
if "log_file" in general_config:
|
||||
opts.log_file = general_config["log_file"]
|
||||
opts.log_file = _expand_path(general_config["log_file"])
|
||||
if "n_procs" in general_config:
|
||||
opts.n_procs = general_config.getint("n_procs")
|
||||
if "ip_db_path" in general_config:
|
||||
opts.ip_db_path = general_config["ip_db_path"]
|
||||
opts.ip_db_path = _expand_path(general_config["ip_db_path"])
|
||||
else:
|
||||
opts.ip_db_path = None
|
||||
if "always_use_local_files" in general_config:
|
||||
@@ -291,7 +384,9 @@ def _parse_config_file(config_file, opts):
|
||||
general_config.getboolean("always_use_local_files")
|
||||
)
|
||||
if "local_reverse_dns_map_path" in general_config:
|
||||
opts.reverse_dns_map_path = general_config["local_reverse_dns_map_path"]
|
||||
opts.reverse_dns_map_path = _expand_path(
|
||||
general_config["local_reverse_dns_map_path"]
|
||||
)
|
||||
if "reverse_dns_map_url" in general_config:
|
||||
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
|
||||
if "prettify_json" in general_config:
|
||||
@@ -406,7 +501,7 @@ def _parse_config_file(config_file, opts):
|
||||
|
||||
if "msgraph" in config.sections():
|
||||
graph_config = config["msgraph"]
|
||||
opts.graph_token_file = graph_config.get("token_file", ".token")
|
||||
opts.graph_token_file = _expand_path(graph_config.get("token_file", ".token"))
|
||||
|
||||
if "auth_method" not in graph_config:
|
||||
logger.info(
|
||||
@@ -460,7 +555,9 @@ def _parse_config_file(config_file, opts):
|
||||
|
||||
if opts.graph_auth_method == AuthMethod.Certificate.name:
|
||||
if "certificate_path" in graph_config:
|
||||
opts.graph_certificate_path = graph_config["certificate_path"]
|
||||
opts.graph_certificate_path = _expand_path(
|
||||
graph_config["certificate_path"]
|
||||
)
|
||||
else:
|
||||
raise ConfigurationError(
|
||||
"certificate_path setting missing from the msgraph config section"
|
||||
@@ -484,6 +581,8 @@ def _parse_config_file(config_file, opts):
|
||||
|
||||
if "graph_url" in graph_config:
|
||||
opts.graph_url = graph_config["graph_url"]
|
||||
elif "url" in graph_config:
|
||||
opts.graph_url = graph_config["url"]
|
||||
|
||||
if "allow_unencrypted_storage" in graph_config:
|
||||
opts.graph_allow_unencrypted_storage = bool(
|
||||
@@ -517,7 +616,13 @@ def _parse_config_file(config_file, opts):
|
||||
if "ssl" in elasticsearch_config:
|
||||
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
|
||||
if "cert_path" in elasticsearch_config:
|
||||
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
|
||||
opts.elasticsearch_ssl_cert_path = _expand_path(
|
||||
elasticsearch_config["cert_path"]
|
||||
)
|
||||
if "skip_certificate_verification" in elasticsearch_config:
|
||||
opts.elasticsearch_skip_certificate_verification = bool(
|
||||
elasticsearch_config.getboolean("skip_certificate_verification")
|
||||
)
|
||||
if "user" in elasticsearch_config:
|
||||
opts.elasticsearch_username = elasticsearch_config["user"]
|
||||
if "password" in elasticsearch_config:
|
||||
@@ -556,7 +661,11 @@ def _parse_config_file(config_file, opts):
|
||||
if "ssl" in opensearch_config:
|
||||
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
|
||||
if "cert_path" in opensearch_config:
|
||||
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
|
||||
opts.opensearch_ssl_cert_path = _expand_path(opensearch_config["cert_path"])
|
||||
if "skip_certificate_verification" in opensearch_config:
|
||||
opts.opensearch_skip_certificate_verification = bool(
|
||||
opensearch_config.getboolean("skip_certificate_verification")
|
||||
)
|
||||
if "user" in opensearch_config:
|
||||
opts.opensearch_username = opensearch_config["user"]
|
||||
if "password" in opensearch_config:
|
||||
@@ -679,7 +788,7 @@ def _parse_config_file(config_file, opts):
|
||||
if "subject" in smtp_config:
|
||||
opts.smtp_subject = smtp_config["subject"]
|
||||
if "attachment" in smtp_config:
|
||||
opts.smtp_attachment = smtp_config["attachment"]
|
||||
opts.smtp_attachment = _expand_path(smtp_config["attachment"])
|
||||
if "message" in smtp_config:
|
||||
opts.smtp_message = smtp_config["message"]
|
||||
|
||||
@@ -726,11 +835,11 @@ def _parse_config_file(config_file, opts):
|
||||
else:
|
||||
opts.syslog_protocol = "udp"
|
||||
if "cafile_path" in syslog_config:
|
||||
opts.syslog_cafile_path = syslog_config["cafile_path"]
|
||||
opts.syslog_cafile_path = _expand_path(syslog_config["cafile_path"])
|
||||
if "certfile_path" in syslog_config:
|
||||
opts.syslog_certfile_path = syslog_config["certfile_path"]
|
||||
opts.syslog_certfile_path = _expand_path(syslog_config["certfile_path"])
|
||||
if "keyfile_path" in syslog_config:
|
||||
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
|
||||
opts.syslog_keyfile_path = _expand_path(syslog_config["keyfile_path"])
|
||||
if "timeout" in syslog_config:
|
||||
opts.syslog_timeout = float(syslog_config["timeout"])
|
||||
else:
|
||||
@@ -746,8 +855,13 @@ def _parse_config_file(config_file, opts):
|
||||
|
||||
if "gmail_api" in config.sections():
|
||||
gmail_api_config = config["gmail_api"]
|
||||
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
|
||||
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
|
||||
gmail_creds = gmail_api_config.get("credentials_file")
|
||||
opts.gmail_api_credentials_file = (
|
||||
_expand_path(gmail_creds) if gmail_creds else gmail_creds
|
||||
)
|
||||
opts.gmail_api_token_file = _expand_path(
|
||||
gmail_api_config.get("token_file", ".token")
|
||||
)
|
||||
opts.gmail_api_include_spam_trash = bool(
|
||||
gmail_api_config.getboolean("include_spam_trash", False)
|
||||
)
|
||||
@@ -760,21 +874,27 @@ def _parse_config_file(config_file, opts):
|
||||
if "oauth2_port" in gmail_api_config:
|
||||
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
|
||||
if "auth_mode" in gmail_api_config:
|
||||
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
|
||||
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
|
||||
if "service_account_user" in gmail_api_config:
|
||||
opts.gmail_api_service_account_user = gmail_api_config.get(
|
||||
opts.gmail_api_service_account_user = gmail_api_config[
|
||||
"service_account_user"
|
||||
).strip()
|
||||
].strip()
|
||||
elif "delegated_user" in gmail_api_config:
|
||||
opts.gmail_api_service_account_user = gmail_api_config.get(
|
||||
opts.gmail_api_service_account_user = gmail_api_config[
|
||||
"delegated_user"
|
||||
).strip()
|
||||
].strip()
|
||||
|
||||
if "maildir" in config.sections():
|
||||
maildir_api_config = config["maildir"]
|
||||
opts.maildir_path = maildir_api_config.get("maildir_path")
|
||||
maildir_p = maildir_api_config.get(
|
||||
"maildir_path", maildir_api_config.get("path")
|
||||
)
|
||||
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
|
||||
opts.maildir_create = bool(
|
||||
maildir_api_config.getboolean("maildir_create", fallback=False)
|
||||
maildir_api_config.getboolean(
|
||||
"maildir_create",
|
||||
fallback=maildir_api_config.getboolean("create", fallback=False),
|
||||
)
|
||||
)
|
||||
|
||||
if "log_analytics" in config.sections():
|
||||
@@ -823,6 +943,38 @@ def _parse_config_file(config_file, opts):
|
||||
return index_prefix_domain_map
|
||||
|
||||
|
||||
class _ElasticsearchHandle:
|
||||
"""Sentinel so Elasticsearch participates in _close_output_clients."""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
conn = elastic.connections.get_connection()
|
||||
if not isinstance(conn, str):
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
elastic.connections.remove_connection("default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class _OpenSearchHandle:
|
||||
"""Sentinel so OpenSearch participates in _close_output_clients."""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
conn = opensearch.connections.get_connection()
|
||||
if not isinstance(conn, str):
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
opensearch.connections.remove_connection("default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _init_output_clients(opts):
|
||||
"""Create output clients based on current opts.
|
||||
|
||||
@@ -834,78 +986,8 @@ def _init_output_clients(opts):
|
||||
"""
|
||||
clients = {}
|
||||
|
||||
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
||||
if opts.elasticsearch_hosts:
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
es_forensic_index = "dmarc_forensic"
|
||||
es_smtp_tls_index = "smtp_tls"
|
||||
if opts.elasticsearch_index_suffix:
|
||||
suffix = opts.elasticsearch_index_suffix
|
||||
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
|
||||
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
|
||||
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
|
||||
if opts.elasticsearch_index_prefix:
|
||||
prefix = opts.elasticsearch_index_prefix
|
||||
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
|
||||
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
|
||||
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
|
||||
elastic_timeout_value = (
|
||||
float(opts.elasticsearch_timeout)
|
||||
if opts.elasticsearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
elastic.set_hosts(
|
||||
opts.elasticsearch_hosts,
|
||||
use_ssl=opts.elasticsearch_ssl,
|
||||
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
|
||||
username=opts.elasticsearch_username,
|
||||
password=opts.elasticsearch_password,
|
||||
api_key=opts.elasticsearch_api_key,
|
||||
timeout=elastic_timeout_value,
|
||||
)
|
||||
elastic.migrate_indexes(
|
||||
aggregate_indexes=[es_aggregate_index],
|
||||
forensic_indexes=[es_forensic_index],
|
||||
)
|
||||
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
os_forensic_index = "dmarc_forensic"
|
||||
os_smtp_tls_index = "smtp_tls"
|
||||
if opts.opensearch_index_suffix:
|
||||
suffix = opts.opensearch_index_suffix
|
||||
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
|
||||
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
|
||||
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
|
||||
if opts.opensearch_index_prefix:
|
||||
prefix = opts.opensearch_index_prefix
|
||||
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
|
||||
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
|
||||
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
|
||||
opensearch_timeout_value = (
|
||||
float(opts.opensearch_timeout)
|
||||
if opts.opensearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
opensearch.set_hosts(
|
||||
opts.opensearch_hosts,
|
||||
use_ssl=opts.opensearch_ssl,
|
||||
ssl_cert_path=opts.opensearch_ssl_cert_path,
|
||||
username=opts.opensearch_username,
|
||||
password=opts.opensearch_password,
|
||||
api_key=opts.opensearch_api_key,
|
||||
timeout=opensearch_timeout_value,
|
||||
auth_type=opts.opensearch_auth_type,
|
||||
aws_region=opts.opensearch_aws_region,
|
||||
aws_service=opts.opensearch_aws_service,
|
||||
)
|
||||
opensearch.migrate_indexes(
|
||||
aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index],
|
||||
)
|
||||
|
||||
if opts.s3_bucket:
|
||||
try:
|
||||
try:
|
||||
if opts.s3_bucket:
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
bucket_path=opts.s3_path,
|
||||
@@ -914,11 +996,11 @@ def _init_output_clients(opts):
|
||||
access_key_id=opts.s3_access_key_id,
|
||||
secret_access_key=opts.s3_secret_access_key,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize S3 client; skipping", exc_info=True)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"S3: {e}") from e
|
||||
|
||||
if opts.syslog_server:
|
||||
try:
|
||||
try:
|
||||
if opts.syslog_server:
|
||||
clients["syslog_client"] = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
@@ -934,73 +1016,150 @@ def _init_output_clients(opts):
|
||||
if opts.syslog_retry_delay is not None
|
||||
else 5,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize syslog client; skipping", exc_info=True
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Syslog: {e}") from e
|
||||
|
||||
if opts.hec:
|
||||
if opts.hec_token is None or opts.hec_index is None:
|
||||
raise ConfigurationError(
|
||||
"HEC token and HEC index are required when using HEC URL"
|
||||
)
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
try:
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
clients["hec_client"] = splunk.HECClient(
|
||||
opts.hec, opts.hec_token, opts.hec_index, verify=verify
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize Splunk HEC client; skipping", exc_info=True
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Splunk HEC: {e}") from e
|
||||
|
||||
if opts.kafka_hosts:
|
||||
ssl_context = None
|
||||
if opts.kafka_ssl:
|
||||
ssl_context = create_default_context()
|
||||
try:
|
||||
if opts.kafka_hosts:
|
||||
ssl_context = None
|
||||
if opts.kafka_skip_certificate_verification:
|
||||
logger.debug("Skipping Kafka certificate verification")
|
||||
ssl_context = create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = CERT_NONE
|
||||
try:
|
||||
clients["kafka_client"] = kafkaclient.KafkaClient(
|
||||
opts.kafka_hosts,
|
||||
username=opts.kafka_username,
|
||||
password=opts.kafka_password,
|
||||
ssl=opts.kafka_ssl,
|
||||
ssl_context=ssl_context,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize Kafka client; skipping", exc_info=True)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Kafka: {e}") from e
|
||||
|
||||
if opts.gelf_host:
|
||||
try:
|
||||
try:
|
||||
if opts.gelf_host:
|
||||
clients["gelf_client"] = gelf.GelfClient(
|
||||
host=opts.gelf_host,
|
||||
port=int(opts.gelf_port),
|
||||
mode=opts.gelf_mode,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("Failed to initialize GELF client; skipping", exc_info=True)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"GELF: {e}") from e
|
||||
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
try:
|
||||
try:
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
clients["webhook_client"] = webhook.WebhookClient(
|
||||
aggregate_url=opts.webhook_aggregate_url,
|
||||
forensic_url=opts.webhook_forensic_url,
|
||||
smtp_tls_url=opts.webhook_smtp_tls_url,
|
||||
timeout=opts.webhook_timeout,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to initialize webhook client; skipping", exc_info=True
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Webhook: {e}") from e
|
||||
|
||||
# Elasticsearch and OpenSearch mutate module-level global state via
|
||||
# connections.create_connection(), which cannot be rolled back if a later
|
||||
# step fails. Initialise them last so that all other clients are created
|
||||
# successfully first; this minimises the window for partial-init problems
|
||||
# during config reload.
|
||||
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
||||
try:
|
||||
if opts.elasticsearch_hosts:
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
es_forensic_index = "dmarc_forensic"
|
||||
es_smtp_tls_index = "smtp_tls"
|
||||
if opts.elasticsearch_index_suffix:
|
||||
suffix = opts.elasticsearch_index_suffix
|
||||
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
|
||||
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
|
||||
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
|
||||
if opts.elasticsearch_index_prefix:
|
||||
prefix = opts.elasticsearch_index_prefix
|
||||
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
|
||||
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
|
||||
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
|
||||
elastic_timeout_value = (
|
||||
float(opts.elasticsearch_timeout)
|
||||
if opts.elasticsearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
elastic.set_hosts(
|
||||
opts.elasticsearch_hosts,
|
||||
use_ssl=opts.elasticsearch_ssl,
|
||||
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
|
||||
skip_certificate_verification=opts.elasticsearch_skip_certificate_verification,
|
||||
username=opts.elasticsearch_username,
|
||||
password=opts.elasticsearch_password,
|
||||
api_key=opts.elasticsearch_api_key,
|
||||
timeout=elastic_timeout_value,
|
||||
)
|
||||
elastic.migrate_indexes(
|
||||
aggregate_indexes=[es_aggregate_index],
|
||||
forensic_indexes=[es_forensic_index],
|
||||
)
|
||||
clients["elasticsearch"] = _ElasticsearchHandle()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Elasticsearch: {e}") from e
|
||||
|
||||
try:
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
os_forensic_index = "dmarc_forensic"
|
||||
os_smtp_tls_index = "smtp_tls"
|
||||
if opts.opensearch_index_suffix:
|
||||
suffix = opts.opensearch_index_suffix
|
||||
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
|
||||
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
|
||||
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
|
||||
if opts.opensearch_index_prefix:
|
||||
prefix = opts.opensearch_index_prefix
|
||||
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
|
||||
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
|
||||
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
|
||||
opensearch_timeout_value = (
|
||||
float(opts.opensearch_timeout)
|
||||
if opts.opensearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
opensearch.set_hosts(
|
||||
opts.opensearch_hosts,
|
||||
use_ssl=opts.opensearch_ssl,
|
||||
ssl_cert_path=opts.opensearch_ssl_cert_path,
|
||||
skip_certificate_verification=opts.opensearch_skip_certificate_verification,
|
||||
username=opts.opensearch_username,
|
||||
password=opts.opensearch_password,
|
||||
api_key=opts.opensearch_api_key,
|
||||
timeout=opensearch_timeout_value,
|
||||
auth_type=opts.opensearch_auth_type,
|
||||
aws_region=opts.opensearch_aws_region,
|
||||
aws_service=opts.opensearch_aws_service,
|
||||
)
|
||||
opensearch.migrate_indexes(
|
||||
aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index],
|
||||
)
|
||||
clients["opensearch"] = _OpenSearchHandle()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"OpenSearch: {e}") from e
|
||||
|
||||
return clients
|
||||
|
||||
@@ -1032,22 +1191,24 @@ def _main():
|
||||
if "policy_published" in report:
|
||||
domain = report["policy_published"]["domain"]
|
||||
elif "reported_domain" in report:
|
||||
domain = report("reported_domain")
|
||||
domain = report["reported_domain"]
|
||||
elif "policies" in report:
|
||||
domain = report["policies"][0]["domain"]
|
||||
domain = report["policies"][0]["policy_domain"]
|
||||
if domain:
|
||||
domain = get_base_domain(domain)
|
||||
for prefix in index_prefix_domain_map:
|
||||
if domain in index_prefix_domain_map[prefix]:
|
||||
prefix = (
|
||||
prefix.lower()
|
||||
.strip()
|
||||
.strip("_")
|
||||
.replace(" ", "_")
|
||||
.replace("-", "_")
|
||||
)
|
||||
prefix = f"{prefix}_"
|
||||
return prefix
|
||||
if domain:
|
||||
domain = domain.lower()
|
||||
for prefix in index_prefix_domain_map:
|
||||
if domain in index_prefix_domain_map[prefix]:
|
||||
prefix = (
|
||||
prefix.lower()
|
||||
.strip()
|
||||
.strip("_")
|
||||
.replace(" ", "_")
|
||||
.replace("-", "_")
|
||||
)
|
||||
prefix = f"{prefix}_"
|
||||
return prefix
|
||||
return None
|
||||
|
||||
def process_reports(reports_):
|
||||
@@ -1058,6 +1219,22 @@ def _main():
|
||||
logger.error(message)
|
||||
output_errors.append(message)
|
||||
|
||||
if index_prefix_domain_map is not None:
|
||||
filtered_tls = []
|
||||
for report in reports_.get("smtp_tls_reports", []):
|
||||
if get_index_prefix(report) is not None:
|
||||
filtered_tls.append(report)
|
||||
else:
|
||||
domain = "unknown"
|
||||
if "policies" in report and report["policies"]:
|
||||
domain = report["policies"][0].get("policy_domain", "unknown")
|
||||
logger.debug(
|
||||
"Ignoring SMTP TLS report for domain not in "
|
||||
"index_prefix_domain_map: %s",
|
||||
domain,
|
||||
)
|
||||
reports_["smtp_tls_reports"] = filtered_tls
|
||||
|
||||
indent_value = 2 if opts.prettify_json else None
|
||||
output_str = "{0}\n".format(
|
||||
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
|
||||
@@ -1529,6 +1706,7 @@ def _main():
|
||||
elasticsearch_index_prefix=None,
|
||||
elasticsearch_ssl=True,
|
||||
elasticsearch_ssl_cert_path=None,
|
||||
elasticsearch_skip_certificate_verification=False,
|
||||
elasticsearch_monthly_indexes=False,
|
||||
elasticsearch_username=None,
|
||||
elasticsearch_password=None,
|
||||
@@ -1541,6 +1719,7 @@ def _main():
|
||||
opensearch_index_prefix=None,
|
||||
opensearch_ssl=True,
|
||||
opensearch_ssl_cert_path=None,
|
||||
opensearch_skip_certificate_verification=False,
|
||||
opensearch_monthly_indexes=False,
|
||||
opensearch_username=None,
|
||||
opensearch_password=None,
|
||||
@@ -1623,11 +1802,18 @@ def _main():
|
||||
|
||||
index_prefix_domain_map = None
|
||||
|
||||
if args.config_file:
|
||||
config_file = args.config_file or os.environ.get("PARSEDMARC_CONFIG_FILE")
|
||||
has_env_config = any(
|
||||
k.startswith("PARSEDMARC_") and k != "PARSEDMARC_CONFIG_FILE"
|
||||
for k in os.environ
|
||||
)
|
||||
|
||||
if config_file or has_env_config:
|
||||
try:
|
||||
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
|
||||
config = _load_config(config_file)
|
||||
index_prefix_domain_map = _parse_config(config, opts)
|
||||
except ConfigurationError as e:
|
||||
logger.error(str(e))
|
||||
logger.critical(str(e))
|
||||
exit(-1)
|
||||
|
||||
logger.setLevel(logging.ERROR)
|
||||
@@ -1649,6 +1835,8 @@ def _main():
|
||||
except Exception as error:
|
||||
logger.warning("Unable to write to log file: {}".format(error))
|
||||
|
||||
opts.active_log_file = opts.log_file
|
||||
|
||||
if (
|
||||
opts.imap_host is None
|
||||
and opts.graph_client_id is None
|
||||
@@ -1664,14 +1852,8 @@ def _main():
|
||||
# Initialize output clients
|
||||
try:
|
||||
clients = _init_output_clients(opts)
|
||||
except elastic.ElasticsearchError:
|
||||
logger.exception("Elasticsearch Error")
|
||||
exit(1)
|
||||
except opensearch.OpenSearchError:
|
||||
logger.exception("OpenSearch Error")
|
||||
exit(1)
|
||||
except ConfigurationError as e:
|
||||
logger.error(str(e))
|
||||
logger.critical(str(e))
|
||||
exit(1)
|
||||
except Exception as error_:
|
||||
logger.error("Output client error: {0}".format(error_))
|
||||
@@ -1807,8 +1989,9 @@ def _main():
|
||||
try:
|
||||
if opts.imap_user is None or opts.imap_password is None:
|
||||
logger.error(
|
||||
"IMAP user and password must be specified ifhost is specified"
|
||||
"IMAP user and password must be specified if host is specified"
|
||||
)
|
||||
exit(1)
|
||||
|
||||
ssl = True
|
||||
verify = True
|
||||
@@ -1985,8 +2168,9 @@ def _main():
|
||||
|
||||
def _handle_sighup(signum, frame):
|
||||
nonlocal _reload_requested
|
||||
# Logging is not async-signal-safe; only set the flag here.
|
||||
# The log message is emitted from the main loop when the flag is read.
|
||||
_reload_requested = True
|
||||
logger.info("SIGHUP received, config will reload after current batch")
|
||||
|
||||
if hasattr(signal, "SIGHUP"):
|
||||
signal.signal(signal.SIGHUP, _handle_sighup)
|
||||
@@ -1995,6 +2179,12 @@ def _main():
|
||||
logger.info("Watching for email - Quit with ctrl-c")
|
||||
|
||||
while True:
|
||||
# Re-check mailbox_watch in case a config reload disabled watch mode
|
||||
if not opts.mailbox_watch:
|
||||
logger.info(
|
||||
"Mailbox watch disabled in reloaded configuration, stopping watcher"
|
||||
)
|
||||
break
|
||||
try:
|
||||
watch_inbox(
|
||||
mailbox_connection=mailbox_connection,
|
||||
@@ -2015,7 +2205,7 @@ def _main():
|
||||
reverse_dns_map_url=opts.reverse_dns_map_url,
|
||||
offline=opts.offline,
|
||||
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
|
||||
should_reload=lambda: _reload_requested,
|
||||
config_reloading=lambda: _reload_requested,
|
||||
)
|
||||
except FileExistsError as error:
|
||||
logger.error("{0}".format(error.__str__()))
|
||||
@@ -2027,21 +2217,36 @@ def _main():
|
||||
if not _reload_requested:
|
||||
break
|
||||
|
||||
# Reload configuration
|
||||
# Reload configuration — emit the log message here (not in the
|
||||
# signal handler, which is not async-signal-safe), then clear the
|
||||
# flag so that any new SIGHUP arriving while we reload will be
|
||||
# captured for the next iteration rather than being silently dropped.
|
||||
logger.info("SIGHUP received, config will reload after current batch")
|
||||
_reload_requested = False
|
||||
logger.info("Reloading configuration...")
|
||||
try:
|
||||
# Build a fresh opts starting from CLI-only defaults so that
|
||||
# sections removed from the config file actually take effect.
|
||||
new_opts = Namespace(**vars(opts_from_cli))
|
||||
new_index_prefix_domain_map = _parse_config_file(
|
||||
args.config_file, new_opts
|
||||
)
|
||||
new_config = _load_config(config_file)
|
||||
new_index_prefix_domain_map = _parse_config(new_config, new_opts)
|
||||
new_clients = _init_output_clients(new_opts)
|
||||
|
||||
# All steps succeeded — commit the changes atomically.
|
||||
_close_output_clients(clients)
|
||||
clients = new_clients
|
||||
index_prefix_domain_map = new_index_prefix_domain_map
|
||||
|
||||
# Reload the reverse DNS map so changes to the
|
||||
# map path/URL in the config take effect.
|
||||
load_reverse_dns_map(
|
||||
REVERSE_DNS_MAP,
|
||||
always_use_local_file=new_opts.always_use_local_files,
|
||||
local_file_path=new_opts.reverse_dns_map_path,
|
||||
url=new_opts.reverse_dns_map_url,
|
||||
offline=new_opts.offline,
|
||||
)
|
||||
|
||||
for k, v in vars(new_opts).items():
|
||||
setattr(opts, k, v)
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.2.1"
|
||||
__version__ = "9.5.4"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -268,6 +268,7 @@ def set_hosts(
|
||||
*,
|
||||
use_ssl: bool = False,
|
||||
ssl_cert_path: Optional[str] = None,
|
||||
skip_certificate_verification: bool = False,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
api_key: Optional[str] = None,
|
||||
@@ -280,6 +281,7 @@ def set_hosts(
|
||||
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
|
||||
use_ssl (bool): Use an HTTPS connection to the server
|
||||
ssl_cert_path (str): Path to the certificate chain
|
||||
skip_certificate_verification (bool): Skip certificate verification
|
||||
username (str): The username to use for authentication
|
||||
password (str): The password to use for authentication
|
||||
api_key (str): The Base64 encoded API key to use for authentication
|
||||
@@ -291,10 +293,11 @@ def set_hosts(
|
||||
if use_ssl:
|
||||
conn_params["use_ssl"] = True
|
||||
if ssl_cert_path:
|
||||
conn_params["verify_certs"] = True
|
||||
conn_params["ca_certs"] = ssl_cert_path
|
||||
else:
|
||||
if skip_certificate_verification:
|
||||
conn_params["verify_certs"] = False
|
||||
else:
|
||||
conn_params["verify_certs"] = True
|
||||
if username and password:
|
||||
conn_params["http_auth"] = username + ":" + password
|
||||
if api_key:
|
||||
@@ -735,6 +738,7 @@ def save_smtp_tls_report_to_elasticsearch(
|
||||
index_date = begin_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = begin_date.strftime("%Y-%m-%d")
|
||||
report = report.copy()
|
||||
report["begin_date"] = begin_date
|
||||
report["end_date"] = end_date
|
||||
|
||||
|
||||
@@ -3,9 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
|
||||
|
||||
@@ -14,6 +12,7 @@ from parsedmarc import (
|
||||
parsed_forensic_reports_to_csv_rows,
|
||||
parsed_smtp_tls_reports_to_csv_rows,
|
||||
)
|
||||
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
|
||||
|
||||
log_context_data = threading.local()
|
||||
|
||||
@@ -37,7 +36,7 @@ class GelfClient(object):
|
||||
"""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.logger = logging.getLogger("parsedmarc_syslog")
|
||||
self.logger = logging.getLogger("parsedmarc_gelf")
|
||||
self.logger.setLevel(logging.INFO)
|
||||
self.logger.addFilter(ContextFilter())
|
||||
self.gelf_mode = {
|
||||
@@ -50,7 +49,7 @@ class GelfClient(object):
|
||||
)
|
||||
self.logger.addHandler(self.handler)
|
||||
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
|
||||
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
@@ -58,13 +57,13 @@ class GelfClient(object):
|
||||
|
||||
log_context_data.parsedmarc = None
|
||||
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
|
||||
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
self.logger.info("parsedmarc forensic report")
|
||||
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
|
||||
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
|
||||
@@ -55,6 +55,7 @@ def _get_creds(
|
||||
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes)
|
||||
creds = flow.run_local_server(open_browser=False, oauth2_port=oauth2_port)
|
||||
# Save the credentials for the next run
|
||||
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
|
||||
with Path(token_file).open("w") as token:
|
||||
token.write(creds.to_json())
|
||||
return creds
|
||||
@@ -175,13 +176,13 @@ class GmailConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
|
||||
@@ -56,6 +56,7 @@ def _load_token(token_path: Path) -> Optional[str]:
|
||||
|
||||
def _cache_auth_record(record: AuthenticationRecord, token_path: Path):
|
||||
token = record.serialize()
|
||||
token_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with token_path.open("w") as token_file:
|
||||
token_file.write(token)
|
||||
|
||||
@@ -278,12 +279,14 @@ class MSGraphConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
|
||||
@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
self._client.noop()
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""
|
||||
Use an IDLE IMAP connection to parse incoming emails,
|
||||
and pass the results to a callback function
|
||||
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
|
||||
check_callback(self)
|
||||
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
IMAPClient(
|
||||
host=self._client.host,
|
||||
@@ -111,5 +113,5 @@ class IMAPConnection(MailboxConnection):
|
||||
except Exception as e:
|
||||
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
|
||||
sleep(check_timeout)
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
|
||||
@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
|
||||
def keepalive(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -19,29 +19,54 @@ class MaildirConnection(MailboxConnection):
|
||||
):
|
||||
self._maildir_path = maildir_path
|
||||
self._maildir_create = maildir_create
|
||||
maildir_owner = os.stat(maildir_path).st_uid
|
||||
if os.getuid() != maildir_owner:
|
||||
if os.getuid() == 0:
|
||||
logger.warning(
|
||||
"Switching uid to {} to access Maildir".format(maildir_owner)
|
||||
)
|
||||
os.setuid(maildir_owner)
|
||||
try:
|
||||
maildir_owner = os.stat(maildir_path).st_uid
|
||||
except OSError:
|
||||
maildir_owner = None
|
||||
current_uid = os.getuid()
|
||||
if maildir_owner is not None and current_uid != maildir_owner:
|
||||
if current_uid == 0:
|
||||
try:
|
||||
logger.warning(
|
||||
"Switching uid to {} to access Maildir".format(maildir_owner)
|
||||
)
|
||||
os.setuid(maildir_owner)
|
||||
except OSError as e:
|
||||
logger.warning(
|
||||
"Failed to switch uid to {}: {}".format(maildir_owner, e)
|
||||
)
|
||||
else:
|
||||
ex = "runtime uid {} differ from maildir {} owner {}".format(
|
||||
os.getuid(), maildir_path, maildir_owner
|
||||
logger.warning(
|
||||
"Runtime uid {} differs from maildir {} owner {}. "
|
||||
"Access may fail if permissions are insufficient.".format(
|
||||
current_uid, maildir_path, maildir_owner
|
||||
)
|
||||
)
|
||||
raise Exception(ex)
|
||||
if maildir_create:
|
||||
for subdir in ("cur", "new", "tmp"):
|
||||
os.makedirs(os.path.join(maildir_path, subdir), exist_ok=True)
|
||||
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
|
||||
self._active_folder: mailbox.Maildir = self._client
|
||||
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
|
||||
|
||||
def _get_folder(self, folder_name: str) -> mailbox.Maildir:
|
||||
"""Return a cached subfolder handle, creating it if needed."""
|
||||
if folder_name not in self._subfolder_client:
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
return self._subfolder_client[folder_name]
|
||||
|
||||
def create_folder(self, folder_name: str):
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
self._get_folder(folder_name)
|
||||
|
||||
def fetch_messages(self, reports_folder: str, **kwargs):
|
||||
return self._client.keys()
|
||||
if reports_folder and reports_folder != "INBOX":
|
||||
self._active_folder = self._get_folder(reports_folder)
|
||||
else:
|
||||
self._active_folder = self._client
|
||||
return self._active_folder.keys()
|
||||
|
||||
def fetch_message(self, message_id: str) -> str:
|
||||
msg = self._client.get(message_id)
|
||||
msg = self._active_folder.get(message_id)
|
||||
if msg is not None:
|
||||
msg = msg.as_string()
|
||||
if msg is not None:
|
||||
@@ -49,26 +74,27 @@ class MaildirConnection(MailboxConnection):
|
||||
return ""
|
||||
|
||||
def delete_message(self, message_id: str):
|
||||
self._client.remove(message_id)
|
||||
self._active_folder.remove(message_id)
|
||||
|
||||
def move_message(self, message_id: str, folder_name: str):
|
||||
message_data = self._client.get(message_id)
|
||||
message_data = self._active_folder.get(message_id)
|
||||
if message_data is None:
|
||||
return
|
||||
if folder_name not in self._subfolder_client:
|
||||
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
|
||||
self._subfolder_client[folder_name].add(message_data)
|
||||
self._client.remove(message_id)
|
||||
dest = self._get_folder(folder_name)
|
||||
dest.add(message_data)
|
||||
self._active_folder.remove(message_id)
|
||||
|
||||
def keepalive(self):
|
||||
return
|
||||
|
||||
def watch(self, check_callback, check_timeout, should_reload=None):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
check_callback(self)
|
||||
except Exception as e:
|
||||
logger.warning("Maildir init error. {0}".format(e))
|
||||
if should_reload and should_reload():
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
|
||||
@@ -271,6 +271,7 @@ def set_hosts(
|
||||
*,
|
||||
use_ssl: Optional[bool] = False,
|
||||
ssl_cert_path: Optional[str] = None,
|
||||
skip_certificate_verification: bool = False,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
api_key: Optional[str] = None,
|
||||
@@ -286,6 +287,7 @@ def set_hosts(
|
||||
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
|
||||
use_ssl (bool): Use an HTTPS connection to the server
|
||||
ssl_cert_path (str): Path to the certificate chain
|
||||
skip_certificate_verification (bool): Skip certificate verification
|
||||
username (str): The username to use for authentication
|
||||
password (str): The password to use for authentication
|
||||
api_key (str): The Base64 encoded API key to use for authentication
|
||||
@@ -300,10 +302,11 @@ def set_hosts(
|
||||
if use_ssl:
|
||||
conn_params["use_ssl"] = True
|
||||
if ssl_cert_path:
|
||||
conn_params["verify_certs"] = True
|
||||
conn_params["ca_certs"] = ssl_cert_path
|
||||
else:
|
||||
if skip_certificate_verification:
|
||||
conn_params["verify_certs"] = False
|
||||
else:
|
||||
conn_params["verify_certs"] = True
|
||||
normalized_auth_type = (auth_type or "basic").strip().lower()
|
||||
if normalized_auth_type == "awssigv4":
|
||||
if not aws_region:
|
||||
@@ -764,6 +767,7 @@ def save_smtp_tls_report_to_opensearch(
|
||||
index_date = begin_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = begin_date.strftime("%Y-%m-%d")
|
||||
report = report.copy()
|
||||
report["begin_date"] = begin_date
|
||||
report["end_date"] = end_date
|
||||
|
||||
|
||||
@@ -93,3 +93,11 @@ class S3Client(object):
|
||||
self.bucket.put_object(
|
||||
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Clean up the boto3 resource."""
|
||||
try:
|
||||
if self.s3.meta is not None:
|
||||
self.s3.meta.client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -58,7 +58,7 @@ class HECClient(object):
|
||||
self.source = source
|
||||
self.session = requests.Session()
|
||||
self.timeout = timeout
|
||||
self.session.verify = verify
|
||||
self.verify = verify
|
||||
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
|
||||
host=self.host, source=self.source, index=self.index
|
||||
)
|
||||
@@ -124,10 +124,12 @@ class HECClient(object):
|
||||
data["event"] = new_report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
@@ -161,10 +163,12 @@ class HECClient(object):
|
||||
data["event"] = report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
@@ -198,12 +202,18 @@ class HECClient(object):
|
||||
data["event"] = report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
if response["code"] != 0:
|
||||
raise SplunkError(response["text"])
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying HTTP session."""
|
||||
self.session.close()
|
||||
|
||||
@@ -335,6 +335,76 @@ def get_ip_address_country(
|
||||
return country
|
||||
|
||||
|
||||
def load_reverse_dns_map(
|
||||
reverse_dns_map: ReverseDNSMap,
|
||||
*,
|
||||
always_use_local_file: bool = False,
|
||||
local_file_path: Optional[str] = None,
|
||||
url: Optional[str] = None,
|
||||
offline: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Loads the reverse DNS map from a URL or local file.
|
||||
|
||||
Clears and repopulates the given map dict in place. If the map is
|
||||
fetched from a URL, that is tried first; on failure (or if offline/local
|
||||
mode is selected) the bundled CSV is used as a fallback.
|
||||
|
||||
Args:
|
||||
reverse_dns_map (dict): The map dict to populate (modified in place)
|
||||
always_use_local_file (bool): Always use a local map file
|
||||
local_file_path (str): Path to a local map file
|
||||
url (str): URL to a reverse DNS map
|
||||
offline (bool): Use the built-in copy of the reverse DNS map
|
||||
"""
|
||||
if url is None:
|
||||
url = (
|
||||
"https://raw.githubusercontent.com/domainaware"
|
||||
"/parsedmarc/master/parsedmarc/"
|
||||
"resources/maps/base_reverse_dns_map.csv"
|
||||
)
|
||||
|
||||
reverse_dns_map.clear()
|
||||
|
||||
def load_csv(_csv_file):
|
||||
reader = csv.DictReader(_csv_file)
|
||||
for row in reader:
|
||||
key = row["base_reverse_dns"].lower().strip()
|
||||
reverse_dns_map[key] = {
|
||||
"name": row["name"].strip(),
|
||||
"type": row["type"].strip(),
|
||||
}
|
||||
|
||||
csv_file = io.StringIO()
|
||||
|
||||
if not (offline or always_use_local_file):
|
||||
try:
|
||||
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
|
||||
headers = {"User-Agent": USER_AGENT}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
csv_file.write(response.text)
|
||||
csv_file.seek(0)
|
||||
load_csv(csv_file)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Failed to fetch reverse DNS map: {e}")
|
||||
except Exception:
|
||||
logger.warning("Not a valid CSV file")
|
||||
csv_file.seek(0)
|
||||
logging.debug("Response body:")
|
||||
logger.debug(csv_file.read())
|
||||
|
||||
if len(reverse_dns_map) == 0:
|
||||
logger.info("Loading included reverse DNS map...")
|
||||
path = str(
|
||||
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
|
||||
)
|
||||
if local_file_path is not None:
|
||||
path = local_file_path
|
||||
with open(path) as csv_file:
|
||||
load_csv(csv_file)
|
||||
|
||||
|
||||
def get_service_from_reverse_dns_base_domain(
|
||||
base_domain,
|
||||
*,
|
||||
@@ -361,55 +431,21 @@ def get_service_from_reverse_dns_base_domain(
|
||||
"""
|
||||
|
||||
base_domain = base_domain.lower().strip()
|
||||
if url is None:
|
||||
url = (
|
||||
"https://raw.githubusercontent.com/domainaware"
|
||||
"/parsedmarc/master/parsedmarc/"
|
||||
"resources/maps/base_reverse_dns_map.csv"
|
||||
)
|
||||
reverse_dns_map_value: ReverseDNSMap
|
||||
if reverse_dns_map is None:
|
||||
reverse_dns_map_value = {}
|
||||
else:
|
||||
reverse_dns_map_value = reverse_dns_map
|
||||
|
||||
def load_csv(_csv_file):
|
||||
reader = csv.DictReader(_csv_file)
|
||||
for row in reader:
|
||||
key = row["base_reverse_dns"].lower().strip()
|
||||
reverse_dns_map_value[key] = {
|
||||
"name": row["name"],
|
||||
"type": row["type"],
|
||||
}
|
||||
|
||||
csv_file = io.StringIO()
|
||||
|
||||
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
|
||||
try:
|
||||
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
|
||||
headers = {"User-Agent": USER_AGENT}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
csv_file.write(response.text)
|
||||
csv_file.seek(0)
|
||||
load_csv(csv_file)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Failed to fetch reverse DNS map: {e}")
|
||||
except Exception:
|
||||
logger.warning("Not a valid CSV file")
|
||||
csv_file.seek(0)
|
||||
logging.debug("Response body:")
|
||||
logger.debug(csv_file.read())
|
||||
|
||||
if len(reverse_dns_map_value) == 0:
|
||||
logger.info("Loading included reverse DNS map...")
|
||||
path = str(
|
||||
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
|
||||
load_reverse_dns_map(
|
||||
reverse_dns_map_value,
|
||||
always_use_local_file=always_use_local_file,
|
||||
local_file_path=local_file_path,
|
||||
url=url,
|
||||
offline=offline,
|
||||
)
|
||||
if local_file_path is not None:
|
||||
path = local_file_path
|
||||
with open(path) as csv_file:
|
||||
load_csv(csv_file)
|
||||
|
||||
service: ReverseDNSService
|
||||
try:
|
||||
service = reverse_dns_map_value[base_domain]
|
||||
|
||||
@@ -50,7 +50,7 @@ dependencies = [
|
||||
"lxml>=4.4.0",
|
||||
"mailsuite>=1.11.2",
|
||||
"msgraph-core==0.2.2",
|
||||
"opensearch-py>=2.4.2,<=3.0.0",
|
||||
"opensearch-py>=2.4.2,<=4.0.0",
|
||||
"publicsuffixlist>=0.10.0",
|
||||
"pygelf>=0.4.2",
|
||||
"requests>=2.22.0",
|
||||
|
||||
Reference in New Issue
Block a user