mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-24 07:22:45 +00:00
Compare commits
6 Commits
config-rel
...
9.4.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2032438d3b | ||
|
|
1e95c5d30b | ||
|
|
cb2384be83 | ||
|
|
9a5b5310fa | ||
|
|
9849598100 | ||
|
|
e82f3e58a1 |
31
CHANGELOG.md
31
CHANGELOG.md
@@ -1,5 +1,36 @@
|
||||
# Changelog
|
||||
|
||||
## 9.4.0
|
||||
|
||||
### Added
|
||||
|
||||
- Extracted `load_reverse_dns_map()` utility function in `utils.py` for loading the reverse DNS map independently of individual IP lookups.
|
||||
- SIGHUP reload now re-downloads/reloads the reverse DNS map, so changes take effect without restarting.
|
||||
- Add premade OpenSearch index patterns, visualizations, and dashboards
|
||||
|
||||
### Changed
|
||||
|
||||
- When `index_prefix_domain_map` is configured, SMTP TLS reports for domains not in the map are now silently dropped instead of being output. Unlike DMARC, TLS-RPT has no DNS authorization records, so this filtering prevents processing reports for unrelated domains.
|
||||
- Bump OpenSearch support to `< 4`
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `get_index_prefix` using wrong key (`domain` instead of `policy_domain`) for SMTP TLS reports, which prevented domain map matching from working for TLS reports.
|
||||
- Domain matching in `get_index_prefix` now lowercases the domain for case-insensitive comparison.
|
||||
|
||||
## 9.3.1
|
||||
|
||||
### Breaking changes
|
||||
|
||||
- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path`
|
||||
- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec`
|
||||
|
||||
### Fixed
|
||||
|
||||
- Splunk HEC `skip_certificate_verification` now works correctly
|
||||
- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict
|
||||
- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error")
|
||||
|
||||
## 9.3.0
|
||||
|
||||
### Added
|
||||
|
||||
@@ -15,7 +15,7 @@ services:
|
||||
condition: service_healthy
|
||||
|
||||
opensearch-dashboards:
|
||||
image: opensearchproject/opensearch-dashboards:2
|
||||
image: opensearchproject/opensearch-dashboards:3
|
||||
environment:
|
||||
- OPENSEARCH_HOSTS=["https://opensearch:9200"]
|
||||
ports:
|
||||
@@ -27,7 +27,7 @@ services:
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
environment:
|
||||
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
|
||||
- GRAFANA_PASSWORD=${GRAFANA_PASSWORD}
|
||||
- GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
|
||||
ports:
|
||||
- "127.0.0.1:3000:3000"
|
||||
@@ -41,5 +41,7 @@ services:
|
||||
- SPLUNK_START_ARGS=--accept-license
|
||||
- "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com"
|
||||
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
|
||||
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
|
||||
ports:
|
||||
- "127.0.0.1:8000:8000"
|
||||
- "127.0.0.1:8088:8088"
|
||||
|
||||
@@ -273,6 +273,8 @@ The full set of configuration options are:
|
||||
(Default: `True`)
|
||||
- `timeout` - float: Timeout in seconds (Default: 60)
|
||||
- `cert_path` - str: Path to a trusted certificates
|
||||
- `skip_certificate_verification` - bool: Skip certificate
|
||||
verification (not recommended)
|
||||
- `index_suffix` - str: A suffix to apply to the index names
|
||||
- `index_prefix` - str: A prefix to apply to the index names
|
||||
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
|
||||
@@ -300,6 +302,8 @@ The full set of configuration options are:
|
||||
(Default: `True`)
|
||||
- `timeout` - float: Timeout in seconds (Default: 60)
|
||||
- `cert_path` - str: Path to a trusted certificates
|
||||
- `skip_certificate_verification` - bool: Skip certificate
|
||||
verification (not recommended)
|
||||
- `index_suffix` - str: A suffix to apply to the index names
|
||||
- `index_prefix` - str: A prefix to apply to the index names
|
||||
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
|
||||
|
||||
28
opensearch/opensearch_dashboards.ndjson
Normal file
28
opensearch/opensearch_dashboards.ndjson
Normal file
File diff suppressed because one or more lines are too long
@@ -19,6 +19,7 @@ import yaml
|
||||
from tqdm import tqdm
|
||||
|
||||
from parsedmarc import (
|
||||
REVERSE_DNS_MAP,
|
||||
SEEN_AGGREGATE_REPORT_IDS,
|
||||
InvalidDMARCReport,
|
||||
ParserError,
|
||||
@@ -48,7 +49,12 @@ from parsedmarc.mail import (
|
||||
)
|
||||
from parsedmarc.mail.graph import AuthMethod
|
||||
from parsedmarc.types import ParsingResults
|
||||
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
|
||||
from parsedmarc.utils import (
|
||||
get_base_domain,
|
||||
get_reverse_dns,
|
||||
is_mbox,
|
||||
load_reverse_dns_map,
|
||||
)
|
||||
|
||||
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
|
||||
# private stdlib attribute and may not exist in type stubs.
|
||||
@@ -505,6 +511,10 @@ def _parse_config_file(config_file, opts):
|
||||
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
|
||||
if "cert_path" in elasticsearch_config:
|
||||
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
|
||||
if "skip_certificate_verification" in elasticsearch_config:
|
||||
opts.elasticsearch_skip_certificate_verification = bool(
|
||||
elasticsearch_config.getboolean("skip_certificate_verification")
|
||||
)
|
||||
if "user" in elasticsearch_config:
|
||||
opts.elasticsearch_username = elasticsearch_config["user"]
|
||||
if "password" in elasticsearch_config:
|
||||
@@ -544,6 +554,10 @@ def _parse_config_file(config_file, opts):
|
||||
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
|
||||
if "cert_path" in opensearch_config:
|
||||
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
|
||||
if "skip_certificate_verification" in opensearch_config:
|
||||
opts.opensearch_skip_certificate_verification = bool(
|
||||
opensearch_config.getboolean("skip_certificate_verification")
|
||||
)
|
||||
if "user" in opensearch_config:
|
||||
opts.opensearch_username = opensearch_config["user"]
|
||||
if "password" in opensearch_config:
|
||||
@@ -853,77 +867,95 @@ def _init_output_clients(opts):
|
||||
"""
|
||||
clients = {}
|
||||
|
||||
if opts.s3_bucket:
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
bucket_path=opts.s3_path,
|
||||
region_name=opts.s3_region_name,
|
||||
endpoint_url=opts.s3_endpoint_url,
|
||||
access_key_id=opts.s3_access_key_id,
|
||||
secret_access_key=opts.s3_secret_access_key,
|
||||
)
|
||||
try:
|
||||
if opts.s3_bucket:
|
||||
clients["s3_client"] = s3.S3Client(
|
||||
bucket_name=opts.s3_bucket,
|
||||
bucket_path=opts.s3_path,
|
||||
region_name=opts.s3_region_name,
|
||||
endpoint_url=opts.s3_endpoint_url,
|
||||
access_key_id=opts.s3_access_key_id,
|
||||
secret_access_key=opts.s3_secret_access_key,
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"S3: {e}") from e
|
||||
|
||||
if opts.syslog_server:
|
||||
clients["syslog_client"] = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
protocol=opts.syslog_protocol or "udp",
|
||||
cafile_path=opts.syslog_cafile_path,
|
||||
certfile_path=opts.syslog_certfile_path,
|
||||
keyfile_path=opts.syslog_keyfile_path,
|
||||
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
|
||||
retry_attempts=opts.syslog_retry_attempts
|
||||
if opts.syslog_retry_attempts is not None
|
||||
else 3,
|
||||
retry_delay=opts.syslog_retry_delay
|
||||
if opts.syslog_retry_delay is not None
|
||||
else 5,
|
||||
)
|
||||
try:
|
||||
if opts.syslog_server:
|
||||
clients["syslog_client"] = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
protocol=opts.syslog_protocol or "udp",
|
||||
cafile_path=opts.syslog_cafile_path,
|
||||
certfile_path=opts.syslog_certfile_path,
|
||||
keyfile_path=opts.syslog_keyfile_path,
|
||||
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
|
||||
retry_attempts=opts.syslog_retry_attempts
|
||||
if opts.syslog_retry_attempts is not None
|
||||
else 3,
|
||||
retry_delay=opts.syslog_retry_delay
|
||||
if opts.syslog_retry_delay is not None
|
||||
else 5,
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Syslog: {e}") from e
|
||||
|
||||
if opts.hec:
|
||||
if opts.hec_token is None or opts.hec_index is None:
|
||||
raise ConfigurationError(
|
||||
"HEC token and HEC index are required when using HEC URL"
|
||||
)
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
clients["hec_client"] = splunk.HECClient(
|
||||
opts.hec, opts.hec_token, opts.hec_index, verify=verify
|
||||
)
|
||||
try:
|
||||
verify = True
|
||||
if opts.hec_skip_certificate_verification:
|
||||
verify = False
|
||||
clients["hec_client"] = splunk.HECClient(
|
||||
opts.hec, opts.hec_token, opts.hec_index, verify=verify
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Splunk HEC: {e}") from e
|
||||
|
||||
if opts.kafka_hosts:
|
||||
ssl_context = None
|
||||
if opts.kafka_skip_certificate_verification:
|
||||
logger.debug("Skipping Kafka certificate verification")
|
||||
ssl_context = create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = CERT_NONE
|
||||
clients["kafka_client"] = kafkaclient.KafkaClient(
|
||||
opts.kafka_hosts,
|
||||
username=opts.kafka_username,
|
||||
password=opts.kafka_password,
|
||||
ssl_context=ssl_context,
|
||||
)
|
||||
try:
|
||||
if opts.kafka_hosts:
|
||||
ssl_context = None
|
||||
if opts.kafka_skip_certificate_verification:
|
||||
logger.debug("Skipping Kafka certificate verification")
|
||||
ssl_context = create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = CERT_NONE
|
||||
clients["kafka_client"] = kafkaclient.KafkaClient(
|
||||
opts.kafka_hosts,
|
||||
username=opts.kafka_username,
|
||||
password=opts.kafka_password,
|
||||
ssl_context=ssl_context,
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Kafka: {e}") from e
|
||||
|
||||
if opts.gelf_host:
|
||||
clients["gelf_client"] = gelf.GelfClient(
|
||||
host=opts.gelf_host,
|
||||
port=int(opts.gelf_port),
|
||||
mode=opts.gelf_mode,
|
||||
)
|
||||
try:
|
||||
if opts.gelf_host:
|
||||
clients["gelf_client"] = gelf.GelfClient(
|
||||
host=opts.gelf_host,
|
||||
port=int(opts.gelf_port),
|
||||
mode=opts.gelf_mode,
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"GELF: {e}") from e
|
||||
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
clients["webhook_client"] = webhook.WebhookClient(
|
||||
aggregate_url=opts.webhook_aggregate_url,
|
||||
forensic_url=opts.webhook_forensic_url,
|
||||
smtp_tls_url=opts.webhook_smtp_tls_url,
|
||||
timeout=opts.webhook_timeout,
|
||||
)
|
||||
try:
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
or opts.webhook_smtp_tls_url
|
||||
):
|
||||
clients["webhook_client"] = webhook.WebhookClient(
|
||||
aggregate_url=opts.webhook_aggregate_url,
|
||||
forensic_url=opts.webhook_forensic_url,
|
||||
smtp_tls_url=opts.webhook_smtp_tls_url,
|
||||
timeout=opts.webhook_timeout,
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Webhook: {e}") from e
|
||||
|
||||
# Elasticsearch and OpenSearch mutate module-level global state via
|
||||
# connections.create_connection(), which cannot be rolled back if a later
|
||||
@@ -931,76 +963,84 @@ def _init_output_clients(opts):
|
||||
# successfully first; this minimises the window for partial-init problems
|
||||
# during config reload.
|
||||
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
||||
if opts.elasticsearch_hosts:
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
es_forensic_index = "dmarc_forensic"
|
||||
es_smtp_tls_index = "smtp_tls"
|
||||
if opts.elasticsearch_index_suffix:
|
||||
suffix = opts.elasticsearch_index_suffix
|
||||
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
|
||||
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
|
||||
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
|
||||
if opts.elasticsearch_index_prefix:
|
||||
prefix = opts.elasticsearch_index_prefix
|
||||
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
|
||||
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
|
||||
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
|
||||
elastic_timeout_value = (
|
||||
float(opts.elasticsearch_timeout)
|
||||
if opts.elasticsearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
elastic.set_hosts(
|
||||
opts.elasticsearch_hosts,
|
||||
use_ssl=opts.elasticsearch_ssl,
|
||||
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
|
||||
username=opts.elasticsearch_username,
|
||||
password=opts.elasticsearch_password,
|
||||
api_key=opts.elasticsearch_api_key,
|
||||
timeout=elastic_timeout_value,
|
||||
)
|
||||
elastic.migrate_indexes(
|
||||
aggregate_indexes=[es_aggregate_index],
|
||||
forensic_indexes=[es_forensic_index],
|
||||
)
|
||||
clients["elasticsearch"] = _ElasticsearchHandle()
|
||||
try:
|
||||
if opts.elasticsearch_hosts:
|
||||
es_aggregate_index = "dmarc_aggregate"
|
||||
es_forensic_index = "dmarc_forensic"
|
||||
es_smtp_tls_index = "smtp_tls"
|
||||
if opts.elasticsearch_index_suffix:
|
||||
suffix = opts.elasticsearch_index_suffix
|
||||
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
|
||||
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
|
||||
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
|
||||
if opts.elasticsearch_index_prefix:
|
||||
prefix = opts.elasticsearch_index_prefix
|
||||
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
|
||||
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
|
||||
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
|
||||
elastic_timeout_value = (
|
||||
float(opts.elasticsearch_timeout)
|
||||
if opts.elasticsearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
elastic.set_hosts(
|
||||
opts.elasticsearch_hosts,
|
||||
use_ssl=opts.elasticsearch_ssl,
|
||||
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
|
||||
skip_certificate_verification=opts.elasticsearch_skip_certificate_verification,
|
||||
username=opts.elasticsearch_username,
|
||||
password=opts.elasticsearch_password,
|
||||
api_key=opts.elasticsearch_api_key,
|
||||
timeout=elastic_timeout_value,
|
||||
)
|
||||
elastic.migrate_indexes(
|
||||
aggregate_indexes=[es_aggregate_index],
|
||||
forensic_indexes=[es_forensic_index],
|
||||
)
|
||||
clients["elasticsearch"] = _ElasticsearchHandle()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Elasticsearch: {e}") from e
|
||||
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
os_forensic_index = "dmarc_forensic"
|
||||
os_smtp_tls_index = "smtp_tls"
|
||||
if opts.opensearch_index_suffix:
|
||||
suffix = opts.opensearch_index_suffix
|
||||
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
|
||||
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
|
||||
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
|
||||
if opts.opensearch_index_prefix:
|
||||
prefix = opts.opensearch_index_prefix
|
||||
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
|
||||
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
|
||||
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
|
||||
opensearch_timeout_value = (
|
||||
float(opts.opensearch_timeout)
|
||||
if opts.opensearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
opensearch.set_hosts(
|
||||
opts.opensearch_hosts,
|
||||
use_ssl=opts.opensearch_ssl,
|
||||
ssl_cert_path=opts.opensearch_ssl_cert_path,
|
||||
username=opts.opensearch_username,
|
||||
password=opts.opensearch_password,
|
||||
api_key=opts.opensearch_api_key,
|
||||
timeout=opensearch_timeout_value,
|
||||
auth_type=opts.opensearch_auth_type,
|
||||
aws_region=opts.opensearch_aws_region,
|
||||
aws_service=opts.opensearch_aws_service,
|
||||
)
|
||||
opensearch.migrate_indexes(
|
||||
aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index],
|
||||
)
|
||||
clients["opensearch"] = _OpenSearchHandle()
|
||||
try:
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
os_forensic_index = "dmarc_forensic"
|
||||
os_smtp_tls_index = "smtp_tls"
|
||||
if opts.opensearch_index_suffix:
|
||||
suffix = opts.opensearch_index_suffix
|
||||
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
|
||||
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
|
||||
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
|
||||
if opts.opensearch_index_prefix:
|
||||
prefix = opts.opensearch_index_prefix
|
||||
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
|
||||
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
|
||||
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
|
||||
opensearch_timeout_value = (
|
||||
float(opts.opensearch_timeout)
|
||||
if opts.opensearch_timeout is not None
|
||||
else 60.0
|
||||
)
|
||||
opensearch.set_hosts(
|
||||
opts.opensearch_hosts,
|
||||
use_ssl=opts.opensearch_ssl,
|
||||
ssl_cert_path=opts.opensearch_ssl_cert_path,
|
||||
skip_certificate_verification=opts.opensearch_skip_certificate_verification,
|
||||
username=opts.opensearch_username,
|
||||
password=opts.opensearch_password,
|
||||
api_key=opts.opensearch_api_key,
|
||||
timeout=opensearch_timeout_value,
|
||||
auth_type=opts.opensearch_auth_type,
|
||||
aws_region=opts.opensearch_aws_region,
|
||||
aws_service=opts.opensearch_aws_service,
|
||||
)
|
||||
opensearch.migrate_indexes(
|
||||
aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index],
|
||||
)
|
||||
clients["opensearch"] = _OpenSearchHandle()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"OpenSearch: {e}") from e
|
||||
|
||||
return clients
|
||||
|
||||
@@ -1034,20 +1074,22 @@ def _main():
|
||||
elif "reported_domain" in report:
|
||||
domain = report["reported_domain"]
|
||||
elif "policies" in report:
|
||||
domain = report["policies"][0]["domain"]
|
||||
domain = report["policies"][0]["policy_domain"]
|
||||
if domain:
|
||||
domain = get_base_domain(domain)
|
||||
for prefix in index_prefix_domain_map:
|
||||
if domain in index_prefix_domain_map[prefix]:
|
||||
prefix = (
|
||||
prefix.lower()
|
||||
.strip()
|
||||
.strip("_")
|
||||
.replace(" ", "_")
|
||||
.replace("-", "_")
|
||||
)
|
||||
prefix = f"{prefix}_"
|
||||
return prefix
|
||||
if domain:
|
||||
domain = domain.lower()
|
||||
for prefix in index_prefix_domain_map:
|
||||
if domain in index_prefix_domain_map[prefix]:
|
||||
prefix = (
|
||||
prefix.lower()
|
||||
.strip()
|
||||
.strip("_")
|
||||
.replace(" ", "_")
|
||||
.replace("-", "_")
|
||||
)
|
||||
prefix = f"{prefix}_"
|
||||
return prefix
|
||||
return None
|
||||
|
||||
def process_reports(reports_):
|
||||
@@ -1058,6 +1100,22 @@ def _main():
|
||||
logger.error(message)
|
||||
output_errors.append(message)
|
||||
|
||||
if index_prefix_domain_map is not None:
|
||||
filtered_tls = []
|
||||
for report in reports_.get("smtp_tls_reports", []):
|
||||
if get_index_prefix(report) is not None:
|
||||
filtered_tls.append(report)
|
||||
else:
|
||||
domain = "unknown"
|
||||
if "policies" in report and report["policies"]:
|
||||
domain = report["policies"][0].get("policy_domain", "unknown")
|
||||
logger.debug(
|
||||
"Ignoring SMTP TLS report for domain not in "
|
||||
"index_prefix_domain_map: %s",
|
||||
domain,
|
||||
)
|
||||
reports_["smtp_tls_reports"] = filtered_tls
|
||||
|
||||
indent_value = 2 if opts.prettify_json else None
|
||||
output_str = "{0}\n".format(
|
||||
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
|
||||
@@ -1529,6 +1587,7 @@ def _main():
|
||||
elasticsearch_index_prefix=None,
|
||||
elasticsearch_ssl=True,
|
||||
elasticsearch_ssl_cert_path=None,
|
||||
elasticsearch_skip_certificate_verification=False,
|
||||
elasticsearch_monthly_indexes=False,
|
||||
elasticsearch_username=None,
|
||||
elasticsearch_password=None,
|
||||
@@ -1541,6 +1600,7 @@ def _main():
|
||||
opensearch_index_prefix=None,
|
||||
opensearch_ssl=True,
|
||||
opensearch_ssl_cert_path=None,
|
||||
opensearch_skip_certificate_verification=False,
|
||||
opensearch_monthly_indexes=False,
|
||||
opensearch_username=None,
|
||||
opensearch_password=None,
|
||||
@@ -1666,12 +1726,6 @@ def _main():
|
||||
# Initialize output clients
|
||||
try:
|
||||
clients = _init_output_clients(opts)
|
||||
except elastic.ElasticsearchError as e:
|
||||
logger.exception("Elasticsearch Error: {0}".format(e))
|
||||
exit(1)
|
||||
except opensearch.OpenSearchError as e:
|
||||
logger.exception("OpenSearch Error: {0}".format(e))
|
||||
exit(1)
|
||||
except ConfigurationError as e:
|
||||
logger.critical(str(e))
|
||||
exit(1)
|
||||
@@ -2057,6 +2111,17 @@ def _main():
|
||||
_close_output_clients(clients)
|
||||
clients = new_clients
|
||||
index_prefix_domain_map = new_index_prefix_domain_map
|
||||
|
||||
# Reload the reverse DNS map so changes to the
|
||||
# map path/URL in the config take effect.
|
||||
load_reverse_dns_map(
|
||||
REVERSE_DNS_MAP,
|
||||
always_use_local_file=new_opts.always_use_local_files,
|
||||
local_file_path=new_opts.reverse_dns_map_path,
|
||||
url=new_opts.reverse_dns_map_url,
|
||||
offline=new_opts.offline,
|
||||
)
|
||||
|
||||
for k, v in vars(new_opts).items():
|
||||
setattr(opts, k, v)
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.3.0"
|
||||
__version__ = "9.4.0"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -268,6 +268,7 @@ def set_hosts(
|
||||
*,
|
||||
use_ssl: bool = False,
|
||||
ssl_cert_path: Optional[str] = None,
|
||||
skip_certificate_verification: bool = False,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
api_key: Optional[str] = None,
|
||||
@@ -280,6 +281,7 @@ def set_hosts(
|
||||
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
|
||||
use_ssl (bool): Use an HTTPS connection to the server
|
||||
ssl_cert_path (str): Path to the certificate chain
|
||||
skip_certificate_verification (bool): Skip certificate verification
|
||||
username (str): The username to use for authentication
|
||||
password (str): The password to use for authentication
|
||||
api_key (str): The Base64 encoded API key to use for authentication
|
||||
@@ -291,10 +293,11 @@ def set_hosts(
|
||||
if use_ssl:
|
||||
conn_params["use_ssl"] = True
|
||||
if ssl_cert_path:
|
||||
conn_params["verify_certs"] = True
|
||||
conn_params["ca_certs"] = ssl_cert_path
|
||||
else:
|
||||
if skip_certificate_verification:
|
||||
conn_params["verify_certs"] = False
|
||||
else:
|
||||
conn_params["verify_certs"] = True
|
||||
if username and password:
|
||||
conn_params["http_auth"] = username + ":" + password
|
||||
if api_key:
|
||||
@@ -735,6 +738,7 @@ def save_smtp_tls_report_to_elasticsearch(
|
||||
index_date = begin_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = begin_date.strftime("%Y-%m-%d")
|
||||
report = report.copy()
|
||||
report["begin_date"] = begin_date
|
||||
report["end_date"] = end_date
|
||||
|
||||
|
||||
@@ -271,6 +271,7 @@ def set_hosts(
|
||||
*,
|
||||
use_ssl: Optional[bool] = False,
|
||||
ssl_cert_path: Optional[str] = None,
|
||||
skip_certificate_verification: bool = False,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
api_key: Optional[str] = None,
|
||||
@@ -286,6 +287,7 @@ def set_hosts(
|
||||
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
|
||||
use_ssl (bool): Use an HTTPS connection to the server
|
||||
ssl_cert_path (str): Path to the certificate chain
|
||||
skip_certificate_verification (bool): Skip certificate verification
|
||||
username (str): The username to use for authentication
|
||||
password (str): The password to use for authentication
|
||||
api_key (str): The Base64 encoded API key to use for authentication
|
||||
@@ -300,10 +302,11 @@ def set_hosts(
|
||||
if use_ssl:
|
||||
conn_params["use_ssl"] = True
|
||||
if ssl_cert_path:
|
||||
conn_params["verify_certs"] = True
|
||||
conn_params["ca_certs"] = ssl_cert_path
|
||||
else:
|
||||
if skip_certificate_verification:
|
||||
conn_params["verify_certs"] = False
|
||||
else:
|
||||
conn_params["verify_certs"] = True
|
||||
normalized_auth_type = (auth_type or "basic").strip().lower()
|
||||
if normalized_auth_type == "awssigv4":
|
||||
if not aws_region:
|
||||
@@ -764,6 +767,7 @@ def save_smtp_tls_report_to_opensearch(
|
||||
index_date = begin_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = begin_date.strftime("%Y-%m-%d")
|
||||
report = report.copy()
|
||||
report["begin_date"] = begin_date
|
||||
report["end_date"] = end_date
|
||||
|
||||
|
||||
@@ -58,7 +58,7 @@ class HECClient(object):
|
||||
self.source = source
|
||||
self.session = requests.Session()
|
||||
self.timeout = timeout
|
||||
self.session.verify = verify
|
||||
self.verify = verify
|
||||
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
|
||||
host=self.host, source=self.source, index=self.index
|
||||
)
|
||||
@@ -124,10 +124,12 @@ class HECClient(object):
|
||||
data["event"] = new_report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
@@ -161,10 +163,12 @@ class HECClient(object):
|
||||
data["event"] = report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
@@ -198,10 +202,12 @@ class HECClient(object):
|
||||
data["event"] = report.copy()
|
||||
json_str += "{0}\n".format(json.dumps(data))
|
||||
|
||||
if not self.session.verify:
|
||||
if not self.verify:
|
||||
logger.debug("Skipping certificate verification for Splunk HEC")
|
||||
try:
|
||||
response = self.session.post(self.url, data=json_str, timeout=self.timeout)
|
||||
response = self.session.post(
|
||||
self.url, data=json_str, verify=self.verify, timeout=self.timeout
|
||||
)
|
||||
response = response.json()
|
||||
except Exception as e:
|
||||
raise SplunkError(e.__str__())
|
||||
|
||||
@@ -335,6 +335,76 @@ def get_ip_address_country(
|
||||
return country
|
||||
|
||||
|
||||
def load_reverse_dns_map(
|
||||
reverse_dns_map: ReverseDNSMap,
|
||||
*,
|
||||
always_use_local_file: bool = False,
|
||||
local_file_path: Optional[str] = None,
|
||||
url: Optional[str] = None,
|
||||
offline: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Loads the reverse DNS map from a URL or local file.
|
||||
|
||||
Clears and repopulates the given map dict in place. If the map is
|
||||
fetched from a URL, that is tried first; on failure (or if offline/local
|
||||
mode is selected) the bundled CSV is used as a fallback.
|
||||
|
||||
Args:
|
||||
reverse_dns_map (dict): The map dict to populate (modified in place)
|
||||
always_use_local_file (bool): Always use a local map file
|
||||
local_file_path (str): Path to a local map file
|
||||
url (str): URL to a reverse DNS map
|
||||
offline (bool): Use the built-in copy of the reverse DNS map
|
||||
"""
|
||||
if url is None:
|
||||
url = (
|
||||
"https://raw.githubusercontent.com/domainaware"
|
||||
"/parsedmarc/master/parsedmarc/"
|
||||
"resources/maps/base_reverse_dns_map.csv"
|
||||
)
|
||||
|
||||
reverse_dns_map.clear()
|
||||
|
||||
def load_csv(_csv_file):
|
||||
reader = csv.DictReader(_csv_file)
|
||||
for row in reader:
|
||||
key = row["base_reverse_dns"].lower().strip()
|
||||
reverse_dns_map[key] = {
|
||||
"name": row["name"].strip(),
|
||||
"type": row["type"].strip(),
|
||||
}
|
||||
|
||||
csv_file = io.StringIO()
|
||||
|
||||
if not (offline or always_use_local_file):
|
||||
try:
|
||||
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
|
||||
headers = {"User-Agent": USER_AGENT}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
csv_file.write(response.text)
|
||||
csv_file.seek(0)
|
||||
load_csv(csv_file)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Failed to fetch reverse DNS map: {e}")
|
||||
except Exception:
|
||||
logger.warning("Not a valid CSV file")
|
||||
csv_file.seek(0)
|
||||
logging.debug("Response body:")
|
||||
logger.debug(csv_file.read())
|
||||
|
||||
if len(reverse_dns_map) == 0:
|
||||
logger.info("Loading included reverse DNS map...")
|
||||
path = str(
|
||||
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
|
||||
)
|
||||
if local_file_path is not None:
|
||||
path = local_file_path
|
||||
with open(path) as csv_file:
|
||||
load_csv(csv_file)
|
||||
|
||||
|
||||
def get_service_from_reverse_dns_base_domain(
|
||||
base_domain,
|
||||
*,
|
||||
@@ -361,55 +431,21 @@ def get_service_from_reverse_dns_base_domain(
|
||||
"""
|
||||
|
||||
base_domain = base_domain.lower().strip()
|
||||
if url is None:
|
||||
url = (
|
||||
"https://raw.githubusercontent.com/domainaware"
|
||||
"/parsedmarc/master/parsedmarc/"
|
||||
"resources/maps/base_reverse_dns_map.csv"
|
||||
)
|
||||
reverse_dns_map_value: ReverseDNSMap
|
||||
if reverse_dns_map is None:
|
||||
reverse_dns_map_value = {}
|
||||
else:
|
||||
reverse_dns_map_value = reverse_dns_map
|
||||
|
||||
def load_csv(_csv_file):
|
||||
reader = csv.DictReader(_csv_file)
|
||||
for row in reader:
|
||||
key = row["base_reverse_dns"].lower().strip()
|
||||
reverse_dns_map_value[key] = {
|
||||
"name": row["name"],
|
||||
"type": row["type"],
|
||||
}
|
||||
|
||||
csv_file = io.StringIO()
|
||||
|
||||
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
|
||||
try:
|
||||
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
|
||||
headers = {"User-Agent": USER_AGENT}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
csv_file.write(response.text)
|
||||
csv_file.seek(0)
|
||||
load_csv(csv_file)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Failed to fetch reverse DNS map: {e}")
|
||||
except Exception:
|
||||
logger.warning("Not a valid CSV file")
|
||||
csv_file.seek(0)
|
||||
logging.debug("Response body:")
|
||||
logger.debug(csv_file.read())
|
||||
|
||||
if len(reverse_dns_map_value) == 0:
|
||||
logger.info("Loading included reverse DNS map...")
|
||||
path = str(
|
||||
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
|
||||
load_reverse_dns_map(
|
||||
reverse_dns_map_value,
|
||||
always_use_local_file=always_use_local_file,
|
||||
local_file_path=local_file_path,
|
||||
url=url,
|
||||
offline=offline,
|
||||
)
|
||||
if local_file_path is not None:
|
||||
path = local_file_path
|
||||
with open(path) as csv_file:
|
||||
load_csv(csv_file)
|
||||
|
||||
service: ReverseDNSService
|
||||
try:
|
||||
service = reverse_dns_map_value[base_domain]
|
||||
|
||||
@@ -50,7 +50,7 @@ dependencies = [
|
||||
"lxml>=4.4.0",
|
||||
"mailsuite>=1.11.2",
|
||||
"msgraph-core==0.2.2",
|
||||
"opensearch-py>=2.4.2,<=3.0.0",
|
||||
"opensearch-py>=2.4.2,<=4.0.0",
|
||||
"publicsuffixlist>=0.10.0",
|
||||
"pygelf>=0.4.2",
|
||||
"requests>=2.22.0",
|
||||
|
||||
256
tests.py
256
tests.py
@@ -3,6 +3,8 @@
|
||||
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
@@ -202,9 +204,7 @@ class Test(unittest.TestCase):
|
||||
sample_content, offline=OFFLINE_MODE
|
||||
)
|
||||
assert email_result["report_type"] == "forensic"
|
||||
result = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "forensic"
|
||||
parsedmarc.parsed_forensic_reports_to_csv(result["report"])
|
||||
print("Passed!")
|
||||
@@ -217,9 +217,7 @@ class Test(unittest.TestCase):
|
||||
if os.path.isdir(sample_path):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
result = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)
|
||||
result = parsedmarc.parse_report_file(sample_path, offline=OFFLINE_MODE)
|
||||
assert result["report_type"] == "smtp_tls"
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"])
|
||||
print("Passed!")
|
||||
@@ -1281,6 +1279,22 @@ class TestImapFallbacks(unittest.TestCase):
|
||||
|
||||
|
||||
class TestMailboxWatchSince(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = True
|
||||
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
||||
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
||||
self._stdout_patch.start()
|
||||
self._stderr_patch.start()
|
||||
|
||||
def tearDown(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = False
|
||||
self._stderr_patch.stop()
|
||||
self._stdout_patch.stop()
|
||||
|
||||
def testWatchInboxPassesSinceToMailboxFetch(self):
|
||||
mailbox_connection = SimpleNamespace()
|
||||
|
||||
@@ -1295,7 +1309,9 @@ class TestMailboxWatchSince(unittest.TestCase):
|
||||
) as mocked:
|
||||
with self.assertRaises(_BreakLoop):
|
||||
parsedmarc.watch_inbox(
|
||||
mailbox_connection=cast(parsedmarc.MailboxConnection, mailbox_connection),
|
||||
mailbox_connection=cast(
|
||||
parsedmarc.MailboxConnection, mailbox_connection
|
||||
),
|
||||
callback=callback,
|
||||
check_timeout=1,
|
||||
batch_size=10,
|
||||
@@ -1371,6 +1387,22 @@ class _DummyMailboxConnection(parsedmarc.MailboxConnection):
|
||||
|
||||
|
||||
class TestMailboxPerformance(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = True
|
||||
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
||||
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
||||
self._stdout_patch.start()
|
||||
self._stderr_patch.start()
|
||||
|
||||
def tearDown(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = False
|
||||
self._stderr_patch.stop()
|
||||
self._stdout_patch.stop()
|
||||
|
||||
def testBatchModeAvoidsExtraFullFetch(self):
|
||||
connection = _DummyMailboxConnection()
|
||||
parsedmarc.get_dmarc_reports_from_mailbox(
|
||||
@@ -1920,6 +1952,22 @@ certificate_path = /tmp/msgraph-cert.pem
|
||||
class TestSighupReload(unittest.TestCase):
|
||||
"""Tests for SIGHUP-driven configuration reload in watch mode."""
|
||||
|
||||
def setUp(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = True
|
||||
self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO)
|
||||
self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO)
|
||||
self._stdout_patch.start()
|
||||
self._stderr_patch.start()
|
||||
|
||||
def tearDown(self):
|
||||
from parsedmarc.log import logger as _logger
|
||||
|
||||
_logger.disabled = False
|
||||
self._stderr_patch.stop()
|
||||
self._stdout_patch.stop()
|
||||
|
||||
_BASE_CONFIG = """[general]
|
||||
silent = true
|
||||
|
||||
@@ -2231,6 +2279,200 @@ watch = true
|
||||
# Second init (after reload with v2 config): kafka_hosts should be None
|
||||
self.assertIsNone(init_opts_captures[1].kafka_hosts)
|
||||
|
||||
@unittest.skipUnless(
|
||||
hasattr(signal, "SIGHUP"),
|
||||
"SIGHUP not available on this platform",
|
||||
)
|
||||
@patch("parsedmarc.cli._init_output_clients")
|
||||
@patch("parsedmarc.cli._parse_config_file")
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.watch_inbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
def testReloadRefreshesReverseDnsMap(
|
||||
self,
|
||||
mock_imap,
|
||||
mock_watch,
|
||||
mock_get_reports,
|
||||
mock_parse_config,
|
||||
mock_init_clients,
|
||||
):
|
||||
"""SIGHUP reload repopulates the reverse DNS map so lookups still work."""
|
||||
import signal as signal_module
|
||||
|
||||
from parsedmarc import REVERSE_DNS_MAP
|
||||
|
||||
mock_imap.return_value = object()
|
||||
mock_get_reports.return_value = {
|
||||
"aggregate_reports": [],
|
||||
"forensic_reports": [],
|
||||
"smtp_tls_reports": [],
|
||||
}
|
||||
|
||||
def parse_side_effect(config_file, opts):
|
||||
opts.imap_host = "imap.example.com"
|
||||
opts.imap_user = "user"
|
||||
opts.imap_password = "pass"
|
||||
opts.mailbox_watch = True
|
||||
return None
|
||||
|
||||
mock_parse_config.side_effect = parse_side_effect
|
||||
mock_init_clients.return_value = {}
|
||||
|
||||
# Snapshot the map state after each watch_inbox call
|
||||
map_snapshots = []
|
||||
|
||||
watch_calls = [0]
|
||||
|
||||
def watch_side_effect(*args, **kwargs):
|
||||
watch_calls[0] += 1
|
||||
if watch_calls[0] == 1:
|
||||
if hasattr(signal_module, "SIGHUP"):
|
||||
import os
|
||||
|
||||
os.kill(os.getpid(), signal_module.SIGHUP)
|
||||
return
|
||||
else:
|
||||
# Capture the map state after reload, before we stop the loop
|
||||
map_snapshots.append(dict(REVERSE_DNS_MAP))
|
||||
raise FileExistsError("stop")
|
||||
|
||||
mock_watch.side_effect = watch_side_effect
|
||||
|
||||
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
||||
cfg.write(self._BASE_CONFIG)
|
||||
cfg_path = cfg.name
|
||||
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
||||
|
||||
# Pre-populate the map so we can verify it gets refreshed
|
||||
REVERSE_DNS_MAP.clear()
|
||||
REVERSE_DNS_MAP["stale.example.com"] = {
|
||||
"name": "Stale",
|
||||
"type": "stale",
|
||||
}
|
||||
original_contents = dict(REVERSE_DNS_MAP)
|
||||
|
||||
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
||||
with self.assertRaises(SystemExit):
|
||||
parsedmarc.cli._main()
|
||||
|
||||
self.assertEqual(mock_watch.call_count, 2)
|
||||
# The map should have been repopulated (not empty, not the stale data)
|
||||
self.assertEqual(len(map_snapshots), 1)
|
||||
refreshed = map_snapshots[0]
|
||||
self.assertGreater(len(refreshed), 0, "Map should not be empty after reload")
|
||||
self.assertNotEqual(
|
||||
refreshed,
|
||||
original_contents,
|
||||
"Map should have been refreshed, not kept stale data",
|
||||
)
|
||||
self.assertNotIn(
|
||||
"stale.example.com",
|
||||
refreshed,
|
||||
"Stale entry should have been cleared by reload",
|
||||
)
|
||||
|
||||
|
||||
class TestIndexPrefixDomainMapTlsFiltering(unittest.TestCase):
|
||||
"""Tests that SMTP TLS reports for unmapped domains are filtered out
|
||||
when index_prefix_domain_map is configured."""
|
||||
|
||||
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
||||
@patch("parsedmarc.cli.IMAPConnection")
|
||||
def testTlsReportsFilteredByDomainMap(
|
||||
self,
|
||||
mock_imap_connection,
|
||||
mock_get_reports,
|
||||
):
|
||||
"""TLS reports for domains not in the map should be silently dropped."""
|
||||
mock_imap_connection.return_value = object()
|
||||
mock_get_reports.return_value = {
|
||||
"aggregate_reports": [],
|
||||
"forensic_reports": [],
|
||||
"smtp_tls_reports": [
|
||||
{
|
||||
"organization_name": "Allowed Org",
|
||||
"begin_date": "2024-01-01T00:00:00Z",
|
||||
"end_date": "2024-01-01T23:59:59Z",
|
||||
"report_id": "allowed-1",
|
||||
"contact_info": "tls@allowed.example.com",
|
||||
"policies": [
|
||||
{
|
||||
"policy_domain": "allowed.example.com",
|
||||
"policy_type": "sts",
|
||||
"successful_session_count": 1,
|
||||
"failed_session_count": 0,
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"organization_name": "Unmapped Org",
|
||||
"begin_date": "2024-01-01T00:00:00Z",
|
||||
"end_date": "2024-01-01T23:59:59Z",
|
||||
"report_id": "unmapped-1",
|
||||
"contact_info": "tls@unmapped.example.net",
|
||||
"policies": [
|
||||
{
|
||||
"policy_domain": "unmapped.example.net",
|
||||
"policy_type": "sts",
|
||||
"successful_session_count": 5,
|
||||
"failed_session_count": 0,
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"organization_name": "Mixed Case Org",
|
||||
"begin_date": "2024-01-01T00:00:00Z",
|
||||
"end_date": "2024-01-01T23:59:59Z",
|
||||
"report_id": "mixed-case-1",
|
||||
"contact_info": "tls@mixedcase.example.com",
|
||||
"policies": [
|
||||
{
|
||||
"policy_domain": "MixedCase.Example.Com",
|
||||
"policy_type": "sts",
|
||||
"successful_session_count": 2,
|
||||
"failed_session_count": 0,
|
||||
}
|
||||
],
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
domain_map = {"tenant_a": ["example.com"]}
|
||||
with NamedTemporaryFile("w", suffix=".yaml", delete=False) as map_file:
|
||||
import yaml
|
||||
|
||||
yaml.dump(domain_map, map_file)
|
||||
map_path = map_file.name
|
||||
self.addCleanup(lambda: os.path.exists(map_path) and os.remove(map_path))
|
||||
|
||||
config = f"""[general]
|
||||
save_smtp_tls = true
|
||||
silent = false
|
||||
index_prefix_domain_map = {map_path}
|
||||
|
||||
[imap]
|
||||
host = imap.example.com
|
||||
user = test-user
|
||||
password = test-password
|
||||
"""
|
||||
with NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
|
||||
config_file.write(config)
|
||||
config_path = config_file.name
|
||||
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
||||
|
||||
captured = io.StringIO()
|
||||
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
|
||||
with patch("sys.stdout", captured):
|
||||
parsedmarc.cli._main()
|
||||
|
||||
output = json.loads(captured.getvalue())
|
||||
tls_reports = output["smtp_tls_reports"]
|
||||
self.assertEqual(len(tls_reports), 2)
|
||||
report_ids = {r["report_id"] for r in tls_reports}
|
||||
self.assertIn("allowed-1", report_ids)
|
||||
self.assertIn("mixed-case-1", report_ids)
|
||||
self.assertNotIn("unmapped-1", report_ids)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
|
||||
Reference in New Issue
Block a user