mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-22 14:32:46 +00:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9849598100 | ||
|
|
e82f3e58a1 | ||
|
|
dd1a8fd461 | ||
|
|
81656c75e9 | ||
|
|
691b0fcd41 | ||
|
|
b9343a295f | ||
|
|
b51a62463f | ||
|
|
66ba5b0e5e |
17
.claude/settings.json
Normal file
17
.claude/settings.json
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"permissions": {
|
||||
"allow": [
|
||||
"Bash(python -c \"import py_compile; py_compile.compile\\(''parsedmarc/cli.py'', doraise=True\\)\")",
|
||||
"Bash(ruff check:*)",
|
||||
"Bash(ruff format:*)",
|
||||
"Bash(GITHUB_ACTIONS=true pytest --cov tests.py)",
|
||||
"Bash(ls tests*)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)",
|
||||
"Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)",
|
||||
"Bash(python -m pytest tests.py --no-header -q)"
|
||||
],
|
||||
"additionalDirectories": [
|
||||
"/tmp"
|
||||
]
|
||||
}
|
||||
}
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -137,7 +137,7 @@ samples/private
|
||||
*.html
|
||||
*.sqlite-journal
|
||||
|
||||
parsedmarc.ini
|
||||
parsedmarc*.ini
|
||||
scratch.py
|
||||
|
||||
parsedmarc/resources/maps/base_reverse_dns.csv
|
||||
|
||||
29
CHANGELOG.md
29
CHANGELOG.md
@@ -1,5 +1,34 @@
|
||||
# Changelog
|
||||
|
||||
## 9.3.0
|
||||
|
||||
### Added
|
||||
|
||||
- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing.
|
||||
- Use `systemctl reload parsedmarc` when running under `systemd`.
|
||||
- On a successful reload, old output clients are closed and recreated.
|
||||
- On a failed reload, the previous configuration remains fully active.
|
||||
- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload.
|
||||
- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload.
|
||||
- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`.
|
||||
- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access.
|
||||
- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials.
|
||||
|
||||
## 9.2.1
|
||||
|
||||
### Added
|
||||
|
||||
- Better checking of `msgraph` configuration (PR #695)
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated `dbip-country-lite` database to version `2026-03`
|
||||
- DNS query error logging level from `warning` to `debug`
|
||||
|
||||
## 9.2.0
|
||||
|
||||
### Added
|
||||
|
||||
45
docker-compose.dashboard-dev.yml
Normal file
45
docker-compose.dashboard-dev.yml
Normal file
@@ -0,0 +1,45 @@
|
||||
name: parsedmarc-dashboards
|
||||
|
||||
include:
|
||||
- docker-compose.yml
|
||||
|
||||
services:
|
||||
kibana:
|
||||
image: docker.elastic.co/kibana/kibana:8.19.7
|
||||
environment:
|
||||
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
|
||||
ports:
|
||||
- "127.0.0.1:5601:5601"
|
||||
depends_on:
|
||||
elasticsearch:
|
||||
condition: service_healthy
|
||||
|
||||
opensearch-dashboards:
|
||||
image: opensearchproject/opensearch-dashboards:2
|
||||
environment:
|
||||
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
|
||||
ports:
|
||||
- "127.0.0.1:5602:5601"
|
||||
depends_on:
|
||||
opensearch:
|
||||
condition: service_healthy
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
environment:
|
||||
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
|
||||
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
|
||||
ports:
|
||||
- "127.0.0.1:3000:3000"
|
||||
depends_on:
|
||||
elasticsearch:
|
||||
condition: service_healthy
|
||||
|
||||
splunk:
|
||||
image: splunk/splunk:latest
|
||||
environment:
|
||||
- SPLUNK_START_ARGS=--accept-license
|
||||
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
|
||||
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
|
||||
ports:
|
||||
- "127.0.0.1:8000:8000"
|
||||
@@ -48,7 +48,7 @@ services:
|
||||
test:
|
||||
[
|
||||
"CMD-SHELL",
|
||||
"curl -s -XGET http://localhost:9201/_cluster/health?pretty | grep status | grep -q '\\(green\\|yellow\\)'"
|
||||
"curl -sk -u admin:${OPENSEARCH_INITIAL_ADMIN_PASSWORD} -XGET https://localhost:9200/_cluster/health?pretty | grep status | grep -q '\\(green\\|yellow\\)'"
|
||||
]
|
||||
interval: 10s
|
||||
timeout: 10s
|
||||
|
||||
@@ -404,6 +404,7 @@ The full set of configuration options are:
|
||||
retry_attempts = 3
|
||||
retry_delay = 5
|
||||
```
|
||||
|
||||
- `gmail_api`
|
||||
- `credentials_file` - str: Path to file containing the
|
||||
credentials, None to disable (Default: `None`)
|
||||
@@ -442,7 +443,7 @@ The full set of configuration options are:
|
||||
- `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR
|
||||
|
||||
:::{note}
|
||||
Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
|
||||
Information regarding the setup of the Data Collection Rule can be found [in the Azure documentation](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
|
||||
:::
|
||||
- `gelf`
|
||||
- `host` - str: The GELF server name or IP address
|
||||
@@ -602,6 +603,7 @@ After=network.target network-online.target elasticsearch.service
|
||||
|
||||
[Service]
|
||||
ExecStart=/opt/parsedmarc/venv/bin/parsedmarc -c /etc/parsedmarc.ini
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
User=parsedmarc
|
||||
Group=parsedmarc
|
||||
Restart=always
|
||||
@@ -634,6 +636,51 @@ sudo service parsedmarc restart
|
||||
|
||||
:::
|
||||
|
||||
### Reloading configuration without restarting
|
||||
|
||||
When running in watch mode, `parsedmarc` supports reloading its
|
||||
configuration file without restarting the service or interrupting
|
||||
report processing that is already in progress. Send a `SIGHUP` signal
|
||||
to the process, or use `systemctl reload` if the unit file includes
|
||||
the `ExecReload` line shown above:
|
||||
|
||||
```bash
|
||||
sudo systemctl reload parsedmarc
|
||||
```
|
||||
|
||||
The reload takes effect after the current batch of reports finishes
|
||||
processing and all output operations (Elasticsearch, Kafka, S3, etc.)
|
||||
for that batch have completed. The following settings are reloaded:
|
||||
|
||||
- All output destinations (Elasticsearch, OpenSearch, Kafka, S3,
|
||||
Splunk, syslog, GELF, webhooks, Log Analytics)
|
||||
- Multi-tenant index prefix domain map (`index_prefix_domain_map` —
|
||||
the referenced YAML file is re-read on reload)
|
||||
- DNS and GeoIP settings (`nameservers`, `dns_timeout`, `ip_db_path`,
|
||||
`offline`, etc.)
|
||||
- Processing flags (`strip_attachment_payloads`, `batch_size`,
|
||||
`check_timeout`, etc.)
|
||||
- Log level (`debug`, `verbose`, `warnings`, `silent`)
|
||||
|
||||
Mailbox connection settings (IMAP host/credentials, Microsoft Graph,
|
||||
Gmail API, Maildir path) are **not** reloaded — changing those still
|
||||
requires a full restart.
|
||||
|
||||
On a **successful** reload, existing output client connections are
|
||||
closed and new ones are created from the updated configuration. The
|
||||
service then resumes watching with the new settings.
|
||||
|
||||
If the new configuration file contains errors (missing required
|
||||
settings, unreachable output destinations, etc.), the **entire reload
|
||||
is aborted** — no output clients are replaced and the previous
|
||||
configuration remains fully active. This means a typo in one section
|
||||
will not take down an otherwise working setup. Check the logs for
|
||||
details:
|
||||
|
||||
```bash
|
||||
journalctl -u parsedmarc.service -r
|
||||
```
|
||||
|
||||
To check the status of the service, run:
|
||||
|
||||
```bash
|
||||
|
||||
@@ -2195,6 +2195,7 @@ def watch_inbox(
|
||||
batch_size: int = 10,
|
||||
since: Optional[Union[datetime, date, str]] = None,
|
||||
normalize_timespan_threshold_hours: float = 24,
|
||||
config_reloading: Optional[Callable] = None,
|
||||
):
|
||||
"""
|
||||
Watches the mailbox for new messages and
|
||||
@@ -2222,6 +2223,8 @@ def watch_inbox(
|
||||
batch_size (int): Number of messages to read and process before saving
|
||||
since: Search for messages since certain time
|
||||
normalize_timespan_threshold_hours (float): Normalize timespans beyond this
|
||||
config_reloading: Optional callable that returns True when a config
|
||||
reload has been requested (e.g. via SIGHUP)
|
||||
"""
|
||||
|
||||
def check_callback(connection):
|
||||
@@ -2246,7 +2249,14 @@ def watch_inbox(
|
||||
)
|
||||
callback(res)
|
||||
|
||||
mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout)
|
||||
watch_kwargs: dict = {
|
||||
"check_callback": check_callback,
|
||||
"check_timeout": check_timeout,
|
||||
}
|
||||
if config_reloading is not None:
|
||||
watch_kwargs["config_reloading"] = config_reloading
|
||||
|
||||
mailbox_connection.watch(**watch_kwargs)
|
||||
|
||||
|
||||
def append_json(
|
||||
|
||||
1906
parsedmarc/cli.py
1906
parsedmarc/cli.py
File diff suppressed because it is too large
Load Diff
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.2.0"
|
||||
__version__ = "9.3.0"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -3,9 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
|
||||
|
||||
@@ -14,6 +12,7 @@ from parsedmarc import (
|
||||
parsed_forensic_reports_to_csv_rows,
|
||||
parsed_smtp_tls_reports_to_csv_rows,
|
||||
)
|
||||
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
|
||||
|
||||
log_context_data = threading.local()
|
||||
|
||||
@@ -37,7 +36,7 @@ class GelfClient(object):
|
||||
"""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.logger = logging.getLogger("parsedmarc_syslog")
|
||||
self.logger = logging.getLogger("parsedmarc_gelf")
|
||||
self.logger.setLevel(logging.INFO)
|
||||
self.logger.addFilter(ContextFilter())
|
||||
self.gelf_mode = {
|
||||
@@ -50,7 +49,7 @@ class GelfClient(object):
|
||||
)
|
||||
self.logger.addHandler(self.handler)
|
||||
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
|
||||
def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]):
|
||||
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
@@ -58,14 +57,19 @@ class GelfClient(object):
|
||||
|
||||
log_context_data.parsedmarc = None
|
||||
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
|
||||
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
|
||||
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
self.logger.info("parsedmarc forensic report")
|
||||
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
|
||||
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
|
||||
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
|
||||
for row in rows:
|
||||
log_context_data.parsedmarc = row
|
||||
self.logger.info("parsedmarc smtptls report")
|
||||
|
||||
def close(self):
|
||||
"""Remove and close the GELF handler, releasing its connection."""
|
||||
self.logger.removeHandler(self.handler)
|
||||
self.handler.close()
|
||||
|
||||
@@ -62,6 +62,10 @@ class KafkaClient(object):
|
||||
except NoBrokersAvailable:
|
||||
raise KafkaError("No Kafka brokers available")
|
||||
|
||||
def close(self):
|
||||
"""Close the Kafka producer, releasing background threads and sockets."""
|
||||
self.producer.close()
|
||||
|
||||
@staticmethod
|
||||
def strip_metadata(report: dict[str, Any]):
|
||||
"""
|
||||
|
||||
@@ -175,10 +175,14 @@ class GmailConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
|
||||
@@ -278,10 +278,14 @@ class MSGraphConnection(MailboxConnection):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
|
||||
@@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
self._client.noop()
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
"""
|
||||
Use an IDLE IMAP connection to parse incoming emails,
|
||||
and pass the results to a callback function
|
||||
@@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection):
|
||||
check_callback(self)
|
||||
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
IMAPClient(
|
||||
host=self._client.host,
|
||||
@@ -111,3 +113,5 @@ class IMAPConnection(MailboxConnection):
|
||||
except Exception as e:
|
||||
logger.warning("IMAP connection error. {0}. Reconnecting...".format(e))
|
||||
sleep(check_timeout)
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
|
||||
@@ -28,5 +28,5 @@ class MailboxConnection(ABC):
|
||||
def keepalive(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -63,10 +63,14 @@ class MaildirConnection(MailboxConnection):
|
||||
def keepalive(self):
|
||||
return
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
def watch(self, check_callback, check_timeout, config_reloading=None):
|
||||
while True:
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
try:
|
||||
check_callback(self)
|
||||
except Exception as e:
|
||||
logger.warning("Maildir init error. {0}".format(e))
|
||||
if config_reloading and config_reloading():
|
||||
return
|
||||
sleep(check_timeout)
|
||||
|
||||
@@ -316,9 +316,7 @@ def set_hosts(
|
||||
raise OpenSearchError(
|
||||
"Unable to load AWS credentials for OpenSearch SigV4 authentication"
|
||||
)
|
||||
conn_params["http_auth"] = AWSV4SignerAuth(
|
||||
credentials, aws_region, aws_service
|
||||
)
|
||||
conn_params["http_auth"] = AWSV4SignerAuth(credentials, aws_region, aws_service)
|
||||
conn_params["connection_class"] = RequestsHttpConnection
|
||||
elif normalized_auth_type == "basic":
|
||||
if username and password:
|
||||
|
||||
Binary file not shown.
@@ -93,3 +93,11 @@ class S3Client(object):
|
||||
self.bucket.put_object(
|
||||
Body=json.dumps(report), Key=object_path, Metadata=object_metadata
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Clean up the boto3 resource."""
|
||||
try:
|
||||
if self.s3.meta is not None:
|
||||
self.s3.meta.client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -207,3 +207,7 @@ class HECClient(object):
|
||||
raise SplunkError(e.__str__())
|
||||
if response["code"] != 0:
|
||||
raise SplunkError(response["text"])
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying HTTP session."""
|
||||
self.session.close()
|
||||
|
||||
@@ -57,7 +57,7 @@ class SyslogClient(object):
|
||||
self.logger.setLevel(logging.INFO)
|
||||
|
||||
# Create the appropriate syslog handler based on protocol
|
||||
log_handler = self._create_syslog_handler(
|
||||
self.log_handler = self._create_syslog_handler(
|
||||
server_name,
|
||||
server_port,
|
||||
self.protocol,
|
||||
@@ -69,7 +69,7 @@ class SyslogClient(object):
|
||||
retry_delay,
|
||||
)
|
||||
|
||||
self.logger.addHandler(log_handler)
|
||||
self.logger.addHandler(self.log_handler)
|
||||
|
||||
def _create_syslog_handler(
|
||||
self,
|
||||
@@ -179,3 +179,8 @@ class SyslogClient(object):
|
||||
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
|
||||
for row in rows:
|
||||
self.logger.info(json.dumps(row))
|
||||
|
||||
def close(self):
|
||||
"""Remove and close the syslog handler, releasing its socket."""
|
||||
self.logger.removeHandler(self.log_handler)
|
||||
self.log_handler.close()
|
||||
|
||||
@@ -205,8 +205,7 @@ def get_reverse_dns(
|
||||
)[0]
|
||||
|
||||
except dns.exception.DNSException as e:
|
||||
logger.warning(f"get_reverse_dns({ip_address}) exception: {e}")
|
||||
pass
|
||||
logger.debug(f"get_reverse_dns({ip_address}) exception: {e}")
|
||||
|
||||
return hostname
|
||||
|
||||
|
||||
@@ -63,3 +63,7 @@ class WebhookClient(object):
|
||||
self.session.post(webhook_url, data=payload, timeout=self.timeout)
|
||||
except Exception as error_:
|
||||
logger.error("Webhook Error: {0}".format(error_.__str__()))
|
||||
|
||||
def close(self):
|
||||
"""Close the underlying HTTP session."""
|
||||
self.session.close()
|
||||
|
||||
Reference in New Issue
Block a user