Compare commits

..

6 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
2174f23eb5 Add comprehensive TypedDicts to minimize Any usage in public APIs
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 22:28:19 +00:00
copilot-swe-agent[bot]
febbb107c4 Fix Python 3.9 compatibility: replace pipe union syntax with Union/Optional
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 22:18:57 +00:00
copilot-swe-agent[bot]
9a64b494e7 Fix code review issues: incomplete isinstance and variable name mismatch
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:45:21 +00:00
copilot-swe-agent[bot]
e93209c766 Fix function signatures and improve type annotations
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:42:25 +00:00
copilot-swe-agent[bot]
d1c22466be Replace OrderedDict with dict and add TypedDict definitions
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:36:57 +00:00
copilot-swe-agent[bot]
3d1b2522d3 Initial plan 2025-12-17 21:19:30 +00:00
27 changed files with 910 additions and 2690 deletions

292
.vscode/settings.json vendored
View File

@@ -13,154 +13,148 @@
"MD024": false
},
"cSpell.words": [
"adkim",
"akamaiedge",
"amsmath",
"andrewmcgilvray",
"arcname",
"aspf",
"autoclass",
"automodule",
"backported",
"bellsouth",
"boto",
"brakhane",
"Brightmail",
"CEST",
"CHACHA",
"checkdmarc",
"Codecov",
"confnew",
"dateparser",
"dateutil",
"Davmail",
"DBIP",
"dearmor",
"deflist",
"devel",
"DMARC",
"Dmarcian",
"dnspython",
"dollarmath",
"dpkg",
"exampleuser",
"expiringdict",
"fieldlist",
"GELF",
"genindex",
"geoip",
"geoipupdate",
"Geolite",
"geolocation",
"githubpages",
"Grafana",
"hostnames",
"htpasswd",
"httpasswd",
"httplib",
"ifhost",
"IMAP",
"imapclient",
"infile",
"Interaktive",
"IPDB",
"journalctl",
"kafkaclient",
"keepalive",
"keyout",
"keyrings",
"Leeman",
"libemail",
"linkify",
"LISTSERV",
"loganalytics",
"lxml",
"mailparser",
"mailrelay",
"mailsuite",
"maxdepth",
"MAXHEADERS",
"maxmind",
"mbox",
"mfrom",
"mhdw",
"michaeldavie",
"mikesiegel",
"Mimecast",
"mitigations",
"MMDB",
"modindex",
"msgconvert",
"msgraph",
"MSSP",
"multiprocess",
"Munge",
"ndjson",
"newkey",
"Nhcm",
"nojekyll",
"nondigest",
"nosecureimap",
"nosniff",
"nwettbewerb",
"opensearch",
"opensearchpy",
"parsedmarc",
"passsword",
"pbar",
"Postorius",
"premade",
"privatesuffix",
"procs",
"publicsuffix",
"publicsuffixlist",
"publixsuffix",
"pygelf",
"pypy",
"pytest",
"quickstart",
"Reindex",
"replyto",
"reversename",
"Rollup",
"Rpdm",
"SAMEORIGIN",
"sdist",
"Servernameone",
"setuptools",
"smartquotes",
"SMTPTLS",
"sortlists",
"sortmaps",
"sourcetype",
"STARTTLS",
"tasklist",
"timespan",
"tlsa",
"tlsrpt",
"toctree",
"TQDDM",
"tqdm",
"truststore",
"Übersicht",
"uids",
"Uncategorized",
"unparasable",
"uper",
"urllib",
"Valimail",
"venv",
"Vhcw",
"viewcode",
"virtualenv",
"WBITS",
"webmail",
"Wettbewerber",
"Whalen",
"whitespaces",
"xennn",
"xmltodict",
"xpack",
"zscholl"
"adkim",
"akamaiedge",
"amsmath",
"andrewmcgilvray",
"arcname",
"aspf",
"autoclass",
"automodule",
"backported",
"bellsouth",
"boto",
"brakhane",
"Brightmail",
"CEST",
"CHACHA",
"checkdmarc",
"Codecov",
"confnew",
"dateparser",
"dateutil",
"Davmail",
"DBIP",
"dearmor",
"deflist",
"devel",
"DMARC",
"Dmarcian",
"dnspython",
"dollarmath",
"dpkg",
"exampleuser",
"expiringdict",
"fieldlist",
"GELF",
"genindex",
"geoip",
"geoipupdate",
"Geolite",
"geolocation",
"githubpages",
"Grafana",
"hostnames",
"htpasswd",
"httpasswd",
"httplib",
"IMAP",
"imapclient",
"infile",
"Interaktive",
"IPDB",
"journalctl",
"keepalive",
"keyout",
"keyrings",
"Leeman",
"libemail",
"linkify",
"LISTSERV",
"lxml",
"mailparser",
"mailrelay",
"mailsuite",
"maxdepth",
"MAXHEADERS",
"maxmind",
"mbox",
"mfrom",
"michaeldavie",
"mikesiegel",
"Mimecast",
"mitigations",
"MMDB",
"modindex",
"msgconvert",
"msgraph",
"MSSP",
"multiprocess",
"Munge",
"ndjson",
"newkey",
"Nhcm",
"nojekyll",
"nondigest",
"nosecureimap",
"nosniff",
"nwettbewerb",
"opensearch",
"opensearchpy",
"parsedmarc",
"passsword",
"Postorius",
"premade",
"procs",
"publicsuffix",
"publicsuffixlist",
"publixsuffix",
"pygelf",
"pypy",
"pytest",
"quickstart",
"Reindex",
"replyto",
"reversename",
"Rollup",
"Rpdm",
"SAMEORIGIN",
"sdist",
"Servernameone",
"setuptools",
"smartquotes",
"SMTPTLS",
"sortlists",
"sortmaps",
"sourcetype",
"STARTTLS",
"tasklist",
"timespan",
"tlsa",
"tlsrpt",
"toctree",
"TQDDM",
"tqdm",
"truststore",
"Übersicht",
"uids",
"Uncategorized",
"unparasable",
"uper",
"urllib",
"Valimail",
"venv",
"Vhcw",
"viewcode",
"virtualenv",
"WBITS",
"webmail",
"Wettbewerber",
"Whalen",
"whitespaces",
"xennn",
"xmltodict",
"xpack",
"zscholl"
],
}

View File

@@ -1,42 +1,5 @@
# Changelog
## 9.0.9
### Fixes
- Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
## 9.0.8
### Fixes
- Fix logging configuration not propagating to child parser processes (#646).
- Update `mailsuite` dependency to `?=1.11.1` to solve issues with iCloud IMAP (#493).
## 9.0.7
## Fixes
- Fix IMAP `since` option (#PR 645 closes issues #581 and #643).
## 9.0.6
### Fixes
- Fix #638.
- Fix/clarify report extraction and parsing behavior for multiple input types (bytes, base64 strings, and file-like objects).
- Fix type mismatches that could cause runtime issues in SMTP emailing and CLI option handling.
### Improvements
- Improve type hints across the library (Pylance/Pyright friendliness) and reduce false-positive linter errors.
- Emails in Microsoft 365 are now marked read as they are read. This provides constancy with other mailbox types, and gives you a indication of when emails are being read as they are processed in batches. (Close #625)
### Compatibility / Dependencies
- Set Python requirement to `>=3.9,<3.14`.
- Bump `mailsuite` requirement to `>=1.11.0`.
## 9.0.5
## Fixes

View File

@@ -44,7 +44,6 @@ Thanks to all
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for
use with premade dashboards
- Optionally send reports to Apache Kafka
- Optionally send reports to Google SecOps (Chronicle) in UDM format via API or stdout
## Python Compatibility
@@ -62,4 +61,4 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | ❌ | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
| 3.14 | ❌ | Not currently supported due to Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|

View File

@@ -28,13 +28,6 @@
:members:
```
## parsedmarc.types
```{eval-rst}
.. automodule:: parsedmarc.types
:members:
```
## parsedmarc.utils
```{eval-rst}

View File

@@ -1,494 +0,0 @@
# Google SecOps (Chronicle) Output
`parsedmarc` can output DMARC reports to Google SecOps (Chronicle) in UDM (Unified Data Model) format.
## Configuration
To enable Google SecOps output, add a `[google_secops]` section to your configuration file:
### Primary Method: Chronicle Ingestion API
The recommended approach is to send events directly to Chronicle via the Ingestion API:
```ini
[general]
save_aggregate = True
save_forensic = True
[google_secops]
# Required: Path to Google service account JSON credentials file
api_credentials_file = /path/to/service-account-credentials.json
# Required: Chronicle customer ID
api_customer_id = your-customer-id-here
# Optional: Chronicle region (default: us)
# Options: us, europe, asia-southeast1, me-central2, australia-southeast1
api_region = us
# Optional: Log type for Chronicle ingestion (default: DMARC)
api_log_type = DMARC
# Optional: Include forensic report message payload (default: False)
# For privacy, message bodies are excluded by default
include_ruf_payload = False
# Optional: Maximum bytes of forensic message payload to include (default: 4096)
ruf_payload_max_bytes = 4096
# Optional: Static observer name for telemetry identification
static_observer_name = my-parsedmarc-instance
# Optional: Static observer vendor (default: parsedmarc)
static_observer_vendor = parsedmarc
# Optional: Static environment label (e.g., prod, dev)
static_environment = prod
```
### Alternative Method: stdout Output
If you prefer to use an external log shipper (Fluentd, Logstash, Chronicle forwarder), set `use_stdout = True`:
```ini
[google_secops]
# Output to stdout instead of Chronicle API
use_stdout = True
# Other optional configuration options (as above)
include_ruf_payload = False
ruf_payload_max_bytes = 4096
static_observer_name = my-instance
static_observer_vendor = parsedmarc
static_environment = prod
```
## Output Format
The Google SecOps output produces newline-delimited JSON (NDJSON) in Chronicle UDM format, which can be ingested into Google SecOps for hunting and dashboarding.
### Event Types
1. **DMARC_AGGREGATE**: One event per aggregate report row, preserving count and period information
2. **DMARC_FORENSIC**: One event per forensic report
3. **SMTP_TLS_REPORT**: One event per SMTP TLS failure detail
4. **DMARC_PARSE_ERROR**: Generated when parsing fails (does not crash)
### UDM Schema
Each event includes:
- **metadata**: Event timestamp, type, product name, and vendor
- `event_timestamp`: RFC 3339 formatted timestamp
- `event_type`: Always "GENERIC_EVENT" for UDM
- `product_name`: Always "parsedmarc"
- `vendor_name`: Configurable via `static_observer_vendor` (default: "parsedmarc")
- `product_deployment_id` (optional): Set via `static_observer_name` config
- **principal**: Source IP address, location (country), and hostname (reverse DNS)
- `ip`: Array containing source IP address (always present)
- `location.country_or_region` (optional): ISO country code from IP geolocation
- `hostname` (optional): Reverse DNS hostname for source IP
- **target**: Domain name (from DMARC policy)
- `domain.name`: The domain being protected by DMARC
- **security_result**: Severity level, description, and detection fields for dashboarding
- `severity`: Derived severity level (HIGH/MEDIUM/LOW/ERROR)
- `description`: Human-readable event description
- **detection_fields**: Key DMARC dimensions for filtering and grouping
- All dashboard-relevant fields use `dmarc.*` or `smtp_tls.*` prefixes for easy identification
- Includes IP enrichment data (service name and type from reverse DNS mapping) for enhanced filtering
- See "Detection Fields" section below for complete field listings
- **additional.fields** (optional): Low-value context fields not typically used for dashboarding
- SPF/DKIM authentication details (e.g., `spf_0_domain`, `spf_0_result`)
- Forensic report metadata (e.g., `feedback_type`, `message_id`, `authentication_results`)
- Base domain, environment tags, optional message samples
**Design Rationale**: DMARC dimensions are placed in `security_result[].detection_fields` rather than `additional.fields` because Chronicle dashboards, stats searches, and aggregations work best with UDM label arrays. The `additional.fields` is a protobuf Struct intended for opaque context and is not reliably queryable for dashboard operations.
### Detection Fields
**Aggregate Report Fields** (`DMARC_AGGREGATE` events):
- `dmarc.disposition`: DMARC policy action (none, quarantine, reject)
- `dmarc.policy`: Published DMARC policy (none, quarantine, reject)
- `dmarc.pass`: Boolean - overall DMARC authentication result
- `dmarc.spf_aligned`: Boolean - SPF alignment status
- `dmarc.dkim_aligned`: Boolean - DKIM alignment status
- `dmarc.header_from`: Header From domain
- `dmarc.envelope_from`: Envelope From domain (MAIL FROM)
- `dmarc.report_org`: Reporting organization name
- `dmarc.report_id`: Unique report identifier
- `dmarc.report_begin`: Report period start timestamp
- `dmarc.report_end`: Report period end timestamp
- `dmarc.row_count`: Number of messages represented by this record
- `dmarc.spf_result` (optional): SPF authentication result (pass, fail, etc.)
- `dmarc.dkim_result` (optional): DKIM authentication result (pass, fail, etc.)
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
**Forensic Report Fields** (`DMARC_FORENSIC` events):
- `dmarc.auth_failure`: Authentication failure type(s) (dmarc, spf, dkim)
- `dmarc.reported_domain`: Domain that failed DMARC authentication
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
**SMTP TLS Report Fields** (`SMTP_TLS_REPORT` events):
- `smtp_tls.policy_domain`: Domain being monitored for TLS policy
- `smtp_tls.result_type`: Type of TLS failure (certificate-expired, validation-failure, etc.)
- `smtp_tls.failed_session_count`: Number of failed sessions
- `smtp_tls.report_org`: Reporting organization name
- `smtp_tls.report_begin`: Report period start timestamp
- `smtp_tls.report_end`: Report period end timestamp
### Severity Heuristics
- **HIGH**: DMARC disposition = reject
- **MEDIUM**: DMARC disposition = quarantine with partial SPF/DKIM failures
- **LOW**: DMARC disposition = none or pass
## Example Output
### Aggregate Report Event
```json
{
"event_type": "DMARC_AGGREGATE",
"metadata": {
"event_timestamp": "2018-06-19T00:00:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"principal": {
"ip": ["199.230.200.36"],
"location": {"country_or_region": "US"}
},
"target": {
"domain": {"name": "example.com"}
},
"security_result": [{
"severity": "LOW",
"description": "DMARC fail; SPF=pass; DKIM=pass; SPF not aligned; DKIM not aligned; disposition=none",
"detection_fields": [
{"key": "dmarc.disposition", "value": "none"},
{"key": "dmarc.policy", "value": "none"},
{"key": "dmarc.pass", "value": false},
{"key": "dmarc.spf_aligned", "value": false},
{"key": "dmarc.dkim_aligned", "value": false},
{"key": "dmarc.header_from", "value": "example.com"},
{"key": "dmarc.envelope_from", "value": "example.com"},
{"key": "dmarc.report_org", "value": "example.net"},
{"key": "dmarc.report_id", "value": "b043f0e264cf4ea995e93765242f6dfb"},
{"key": "dmarc.report_begin", "value": "2018-06-19 00:00:00"},
{"key": "dmarc.report_end", "value": "2018-06-19 23:59:59"},
{"key": "dmarc.row_count", "value": 1},
{"key": "dmarc.spf_result", "value": "pass"},
{"key": "dmarc.dkim_result", "value": "pass"},
{"key": "dmarc.source_service_name", "value": "Example Mail Service"},
{"key": "dmarc.source_service_type", "value": "Email Provider"}
]
}],
"additional": {
"fields": [
{"key": "spf_0_domain", "value": "example.com"},
{"key": "spf_0_result", "value": "pass"},
{"key": "dkim_0_domain", "value": "example.com"},
{"key": "dkim_0_result", "value": "pass"}
]
}
}
```
### Forensic Report Event
```json
{
"event_type": "DMARC_FORENSIC",
"metadata": {
"event_timestamp": "2019-04-30T02:09:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"principal": {
"ip": ["10.10.10.10"],
"location": {"country_or_region": "US"},
"hostname": "mail.example-sender.com"
},
"target": {
"domain": {"name": "example.com"}
},
"security_result": [{
"severity": "MEDIUM",
"description": "DMARC forensic report: authentication failure (dmarc)",
"detection_fields": [
{"key": "dmarc.auth_failure", "value": "dmarc"},
{"key": "dmarc.reported_domain", "value": "example.com"},
{"key": "dmarc.source_service_name", "value": "Example Mail Provider"},
{"key": "dmarc.source_service_type", "value": "Email Provider"}
]
}],
"additional": {
"fields": [
{"key": "feedback_type", "value": "auth-failure"},
{"key": "message_id", "value": "<01010101010101010101010101010101@ABAB01MS0016.someserver.loc>"},
{"key": "authentication_results", "value": "dmarc=fail (p=none; dis=none) header.from=example.com"},
{"key": "delivery_result", "value": "delivered"}
]
}
}
```
### SMTP TLS Report Event
```json
{
"event_type": "SMTP_TLS_REPORT",
"metadata": {
"event_timestamp": "2016-04-01T00:00:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"target": {
"domain": {
"name": "company-y.example"
}
},
"security_result": [{
"severity": "LOW",
"description": "SMTP TLS failure: certificate-expired",
"detection_fields": [
{"key": "smtp_tls.policy_domain", "value": "company-y.example"},
{"key": "smtp_tls.result_type", "value": "certificate-expired"},
{"key": "smtp_tls.failed_session_count", "value": 100},
{"key": "smtp_tls.report_org", "value": "Company-X"},
{"key": "smtp_tls.report_begin", "value": "2016-04-01T00:00:00Z"},
{"key": "smtp_tls.report_end", "value": "2016-04-01T23:59:59Z"}
]
}],
"principal": {
"ip": ["2001:db8:abcd:0012::1"]
}
}
```
### Parse Error Event
```json
{
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": "2026-01-09T16:22:10.933751+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"security_result": [{
"severity": "ERROR",
"description": "Failed to parse DMARC report: Invalid XML structure"
}]
}
```
## Google SecOps Searches
Here are some example YARA-L rules you can use in Google SecOps to hunt for DMARC issues:
### Find all DMARC aggregate report failures
```yara-l
rule dmarc_aggregate_failures {
meta:
author = "parsedmarc"
description = "Detect DMARC authentication failures in aggregate reports"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
condition:
$e
}
```
### Find high severity DMARC events (rejected mail)
```yara-l
rule high_severity_dmarc_events {
meta:
author = "parsedmarc"
description = "Detect high severity DMARC aggregate events (rejected mail)"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.severity = "HIGH"
condition:
$e
}
```
### Find repeated DMARC failures from same source IP
```yara-l
rule repeated_dmarc_failures {
meta:
author = "parsedmarc"
description = "Detect repeated DMARC failures from the same source IP"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
$e.principal.ip = $source_ip
match:
$source_ip over 1h
condition:
#e > 5
}
```
### Find DMARC forensic reports with authentication failures
```yara-l
rule dmarc_forensic_failures {
meta:
author = "parsedmarc"
description = "Detect DMARC forensic reports with authentication failures"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_FORENSIC"
$e.security_result.detection_fields.key = "dmarc.auth_failure"
condition:
$e
}
```
### Find DMARC failures from specific mail service types
```yara-l
rule dmarc_failures_by_service_type {
meta:
author = "parsedmarc"
description = "Detect DMARC failures from specific mail service types"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
$e.security_result.detection_fields.key = "dmarc.source_service_type"
$e.security_result.detection_fields.value = "Email Provider"
condition:
$e
}
```
### Find SMTP TLS failures
```yara-l
rule smtp_tls_failures {
meta:
author = "parsedmarc"
description = "Detect SMTP TLS failures"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "SMTP_TLS_REPORT"
condition:
$e
}
```
## Privacy Considerations
By default, forensic report message bodies are **excluded** from the output to protect privacy. If you need to include message samples for investigation:
1. Set `include_ruf_payload = True` in your configuration
2. Adjust `ruf_payload_max_bytes` to limit the amount of data included (default: 4096 bytes)
3. Message samples will be truncated if they exceed the configured maximum
**Note**: Be aware of data privacy regulations (GDPR, CCPA, etc.) when including message payloads in security telemetry.
## Usage
The Google SecOps output works with all parsedmarc input methods, including file processing and mailbox monitoring.
### Primary Method: Direct API Ingestion
With Chronicle Ingestion API configured, events are sent directly to Chronicle:
```bash
# Process files - events are sent to Chronicle API automatically
parsedmarc -c config.ini samples/aggregate/*.xml
# Monitor mailbox - events are sent to Chronicle API in real-time
parsedmarc -c config.ini
```
No additional log shippers or pipelines are needed. The Google SecOps client handles authentication and batching automatically.
### Alternative Method: stdout Output with Log Shipper
If using `use_stdout = True` in your configuration, output DMARC reports to an external log shipper:
#### Processing Files
```bash
# Output to stdout
parsedmarc -c config.ini samples/aggregate/*.xml > dmarc_events.ndjson
# Stream to file
parsedmarc -c config.ini samples/aggregate/*.xml >> /var/log/dmarc/events.ndjson
# Pipe to log shipper (e.g., Fluentd, Logstash, Chronicle forwarder)
parsedmarc -c config.ini samples/aggregate/*.xml | fluentd
```
#### Monitoring Mailboxes
The Google SecOps output automatically works when monitoring mailboxes via IMAP, Microsoft Graph, or Gmail API. Configure your mailbox connection and enable watching:
```ini
[general]
save_aggregate = True
save_forensic = True
[mailbox]
watch = True
delete = False
batch_size = 10
[imap]
host = imap.example.com
user = dmarc@example.com
password = yourpassword
[google_secops]
# Use stdout mode for log shipper integration
use_stdout = True
include_ruf_payload = False
static_observer_name = mailbox-monitor
static_environment = prod
```
When watching a mailbox with stdout mode, parsedmarc continuously outputs UDM events as new reports arrive:
```bash
parsedmarc -c config.ini | fluentd
```
The output is in newline-delimited JSON format, with one UDM event per line, ready for collection by your log shipper.

View File

@@ -44,7 +44,6 @@ and Valimail.
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for use
with premade dashboards
- Optionally send reports to Apache Kafka
- Optionally send reports to Google SecOps (Chronicle) in UDM format
## Python Compatibility
@@ -75,7 +74,6 @@ elasticsearch
opensearch
kibana
splunk
google_secops
davmail
dmarc
contributing

File diff suppressed because it is too large Load Diff

View File

@@ -3,56 +3,53 @@
"""A CLI for parsing DMARC reports"""
import http.client
import json
import logging
from argparse import Namespace, ArgumentParser
import os
import sys
from argparse import ArgumentParser, Namespace
from configparser import ConfigParser
from glob import glob
from multiprocessing import Pipe, Process
from ssl import CERT_NONE, create_default_context
import logging
import math
import yaml
import json
from ssl import CERT_NONE, create_default_context
from multiprocessing import Pipe, Process
import sys
import http.client
from tqdm import tqdm
from parsedmarc import (
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
get_dmarc_reports_from_mailbox,
watch_inbox,
parse_report_file,
get_dmarc_reports_from_mbox,
elastic,
opensearch,
kafkaclient,
splunk,
save_output,
email_results,
ParserError,
__version__,
elastic,
email_results,
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
google_secops,
kafkaclient,
loganalytics,
opensearch,
parse_report_file,
InvalidDMARCReport,
s3,
save_output,
splunk,
syslog,
watch_inbox,
loganalytics,
gelf,
webhook,
)
from parsedmarc.log import logger
from parsedmarc.mail import (
GmailConnection,
IMAPConnection,
MaildirConnection,
MSGraphConnection,
GmailConnection,
MaildirConnection,
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
setattr(http.client, "_MAXHEADERS", 200)
from parsedmarc.log import logger
from parsedmarc.utils import is_mbox, get_reverse_dns, get_base_domain
from parsedmarc import SEEN_AGGREGATE_REPORT_IDS
http.client._MAXHEADERS = 200 # pylint:disable=protected-access
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
@@ -69,48 +66,6 @@ def _str_to_list(s):
return list(map(lambda i: i.lstrip(), _list))
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
This is needed for child processes to properly log messages.
Args:
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
log_file: Optional path to log file
"""
# Get the logger
from parsedmarc.log import logger
# Set the log level
logger.setLevel(log_level)
# Add StreamHandler with formatter if not already present
# Check if we already have a StreamHandler to avoid duplicates
# Use exact type check to distinguish from FileHandler subclass
has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
if not has_stream_handler:
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Add FileHandler if log_file is specified
if log_file:
try:
fh = logging.FileHandler(log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except (IOError, OSError, PermissionError) as error:
logger.warning("Unable to write to log file: {}".format(error))
def cli_parse(
file_path,
sa,
@@ -123,29 +78,8 @@ def cli_parse(
reverse_dns_map_url,
normalize_timespan_threshold_hours,
conn,
log_level=logging.ERROR,
log_file=None,
):
"""Separated this function for multiprocessing
Args:
file_path: Path to the report file
sa: Strip attachment payloads flag
nameservers: List of nameservers
dns_timeout: DNS timeout
ip_db_path: Path to IP database
offline: Offline mode flag
always_use_local_files: Always use local files flag
reverse_dns_map_path: Path to reverse DNS map
reverse_dns_map_url: URL to reverse DNS map
normalize_timespan_threshold_hours: Timespan threshold
conn: Pipe connection for IPC
log_level: Logging level for this process
log_file: Optional path to log file
"""
# Configure logging in this child process
_configure_logging(log_level, log_file)
"""Separated this function for multiprocessing"""
try:
file_results = parse_report_file(
file_path,
@@ -170,7 +104,6 @@ def _main():
"""Called when the module is executed"""
def get_index_prefix(report):
domain = None
if index_prefix_domain_map is None:
return None
if "policy_published" in report:
@@ -204,7 +137,7 @@ def _main():
print(output_str)
if opts.output:
save_output(
reports_,
results,
output_directory=opts.output,
aggregate_json_filename=opts.aggregate_json_filename,
forensic_json_filename=opts.forensic_json_filename,
@@ -285,14 +218,6 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_aggregate_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_aggregate_url:
indent_value = 2 if opts.prettify_json else None
@@ -378,14 +303,6 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_forensic_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_forensic_url:
indent_value = 2 if opts.prettify_json else None
@@ -471,14 +388,6 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_smtp_tls_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_smtp_tls_url:
indent_value = 2 if opts.prettify_json else None
@@ -747,17 +656,6 @@ def _main():
gelf_host=None,
gelf_port=None,
gelf_mode=None,
google_secops=False,
google_secops_include_ruf_payload=False,
google_secops_ruf_payload_max_bytes=4096,
google_secops_static_observer_name=None,
google_secops_static_observer_vendor="parsedmarc",
google_secops_static_environment=None,
google_secops_api_credentials_file=None,
google_secops_api_customer_id=None,
google_secops_api_region="us",
google_secops_api_log_type="DMARC",
google_secops_use_stdout=False,
webhook_aggregate_url=None,
webhook_forensic_url=None,
webhook_smtp_tls_url=None,
@@ -778,7 +676,7 @@ def _main():
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
opts.silent = general_config.getboolean("silent")
if "normalize_timespan_threshold_hours" in general_config:
opts.normalize_timespan_threshold_hours = general_config.getfloat(
"normalize_timespan_threshold_hours"
@@ -787,10 +685,10 @@ def _main():
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
opts.offline = general_config.getboolean("offline")
if "strip_attachment_payloads" in general_config:
opts.strip_attachment_payloads = bool(
general_config.getboolean("strip_attachment_payloads")
opts.strip_attachment_payloads = general_config.getboolean(
"strip_attachment_payloads"
)
if "output" in general_config:
opts.output = general_config["output"]
@@ -808,8 +706,6 @@ def _main():
opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"]
if "dns_timeout" in general_config:
opts.dns_timeout = general_config.getfloat("dns_timeout")
if opts.dns_timeout is None:
opts.dns_timeout = 2
if "dns_test_address" in general_config:
opts.dns_test_address = general_config["dns_test_address"]
if "nameservers" in general_config:
@@ -832,19 +728,19 @@ def _main():
)
exit(-1)
if "save_aggregate" in general_config:
opts.save_aggregate = bool(general_config.getboolean("save_aggregate"))
opts.save_aggregate = general_config.getboolean("save_aggregate")
if "save_forensic" in general_config:
opts.save_forensic = bool(general_config.getboolean("save_forensic"))
opts.save_forensic = general_config.getboolean("save_forensic")
if "save_smtp_tls" in general_config:
opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls"))
opts.save_smtp_tls = general_config.getboolean("save_smtp_tls")
if "debug" in general_config:
opts.debug = bool(general_config.getboolean("debug"))
opts.debug = general_config.getboolean("debug")
if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose"))
opts.verbose = general_config.getboolean("verbose")
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
opts.silent = general_config.getboolean("silent")
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
opts.warnings = general_config.getboolean("warnings")
if "log_file" in general_config:
opts.log_file = general_config["log_file"]
if "n_procs" in general_config:
@@ -854,15 +750,15 @@ def _main():
else:
opts.ip_db_path = None
if "always_use_local_files" in general_config:
opts.always_use_local_files = bool(
general_config.getboolean("always_use_local_files")
opts.always_use_local_files = general_config.getboolean(
"always_use_local_files"
)
if "reverse_dns_map_path" in general_config:
opts.reverse_dns_map_path = general_config["reverse_dns_path"]
if "reverse_dns_map_url" in general_config:
opts.reverse_dns_map_url = general_config["reverse_dns_url"]
if "prettify_json" in general_config:
opts.prettify_json = bool(general_config.getboolean("prettify_json"))
opts.prettify_json = general_config.getboolean("prettify_json")
if "mailbox" in config.sections():
mailbox_config = config["mailbox"]
@@ -873,11 +769,11 @@ def _main():
if "archive_folder" in mailbox_config:
opts.mailbox_archive_folder = mailbox_config["archive_folder"]
if "watch" in mailbox_config:
opts.mailbox_watch = bool(mailbox_config.getboolean("watch"))
opts.mailbox_watch = mailbox_config.getboolean("watch")
if "delete" in mailbox_config:
opts.mailbox_delete = bool(mailbox_config.getboolean("delete"))
opts.mailbox_delete = mailbox_config.getboolean("delete")
if "test" in mailbox_config:
opts.mailbox_test = bool(mailbox_config.getboolean("test"))
opts.mailbox_test = mailbox_config.getboolean("test")
if "batch_size" in mailbox_config:
opts.mailbox_batch_size = mailbox_config.getint("batch_size")
if "check_timeout" in mailbox_config:
@@ -901,14 +797,14 @@ def _main():
if "port" in imap_config:
opts.imap_port = imap_config.getint("port")
if "timeout" in imap_config:
opts.imap_timeout = imap_config.getint("timeout")
opts.imap_timeout = imap_config.getfloat("timeout")
if "max_retries" in imap_config:
opts.imap_max_retries = imap_config.getint("max_retries")
if "ssl" in imap_config:
opts.imap_ssl = bool(imap_config.getboolean("ssl"))
opts.imap_ssl = imap_config.getboolean("ssl")
if "skip_certificate_verification" in imap_config:
opts.imap_skip_certificate_verification = bool(
imap_config.getboolean("skip_certificate_verification")
opts.imap_skip_certificate_verification = imap_config.getboolean(
"skip_certificate_verification"
)
if "user" in imap_config:
opts.imap_user = imap_config["user"]
@@ -937,7 +833,7 @@ def _main():
"section instead."
)
if "watch" in imap_config:
opts.mailbox_watch = bool(imap_config.getboolean("watch"))
opts.mailbox_watch = imap_config.getboolean("watch")
logger.warning(
"Use of the watch option in the imap "
"configuration section has been deprecated. "
@@ -952,7 +848,7 @@ def _main():
"section instead."
)
if "test" in imap_config:
opts.mailbox_test = bool(imap_config.getboolean("test"))
opts.mailbox_test = imap_config.getboolean("test")
logger.warning(
"Use of the test option in the imap "
"configuration section has been deprecated. "
@@ -1046,8 +942,8 @@ def _main():
opts.graph_url = graph_config["graph_url"]
if "allow_unencrypted_storage" in graph_config:
opts.graph_allow_unencrypted_storage = bool(
graph_config.getboolean("allow_unencrypted_storage")
opts.graph_allow_unencrypted_storage = graph_config.getboolean(
"allow_unencrypted_storage"
)
if "elasticsearch" in config:
@@ -1075,10 +971,10 @@ def _main():
if "index_prefix" in elasticsearch_config:
opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"]
if "monthly_indexes" in elasticsearch_config:
monthly = bool(elasticsearch_config.getboolean("monthly_indexes"))
monthly = elasticsearch_config.getboolean("monthly_indexes")
opts.elasticsearch_monthly_indexes = monthly
if "ssl" in elasticsearch_config:
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
opts.elasticsearch_ssl = elasticsearch_config.getboolean("ssl")
if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
if "user" in elasticsearch_config:
@@ -1115,10 +1011,10 @@ def _main():
if "index_prefix" in opensearch_config:
opts.opensearch_index_prefix = opensearch_config["index_prefix"]
if "monthly_indexes" in opensearch_config:
monthly = bool(opensearch_config.getboolean("monthly_indexes"))
monthly = opensearch_config.getboolean("monthly_indexes")
opts.opensearch_monthly_indexes = monthly
if "ssl" in opensearch_config:
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
opts.opensearch_ssl = opensearch_config.getboolean("ssl")
if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
if "user" in opensearch_config:
@@ -1172,11 +1068,9 @@ def _main():
if "password" in kafka_config:
opts.kafka_password = kafka_config["password"]
if "ssl" in kafka_config:
opts.kafka_ssl = bool(kafka_config.getboolean("ssl"))
opts.kafka_ssl = kafka_config.getboolean("ssl")
if "skip_certificate_verification" in kafka_config:
kafka_verify = bool(
kafka_config.getboolean("skip_certificate_verification")
)
kafka_verify = kafka_config.getboolean("skip_certificate_verification")
opts.kafka_skip_certificate_verification = kafka_verify
if "aggregate_topic" in kafka_config:
opts.kafka_aggregate_topic = kafka_config["aggregate_topic"]
@@ -1208,11 +1102,9 @@ def _main():
if "port" in smtp_config:
opts.smtp_port = smtp_config.getint("port")
if "ssl" in smtp_config:
opts.smtp_ssl = bool(smtp_config.getboolean("ssl"))
opts.smtp_ssl = smtp_config.getboolean("ssl")
if "skip_certificate_verification" in smtp_config:
smtp_verify = bool(
smtp_config.getboolean("skip_certificate_verification")
)
smtp_verify = smtp_config.getboolean("skip_certificate_verification")
opts.smtp_skip_certificate_verification = smtp_verify
if "user" in smtp_config:
opts.smtp_user = smtp_config["user"]
@@ -1280,11 +1172,11 @@ def _main():
gmail_api_config = config["gmail_api"]
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
opts.gmail_api_include_spam_trash = bool(
gmail_api_config.getboolean("include_spam_trash", False)
opts.gmail_api_include_spam_trash = gmail_api_config.getboolean(
"include_spam_trash", False
)
opts.gmail_api_paginate_messages = bool(
gmail_api_config.getboolean("paginate_messages", True)
opts.gmail_api_paginate_messages = gmail_api_config.getboolean(
"paginate_messages", True
)
opts.gmail_api_scopes = gmail_api_config.get(
"scopes", default_gmail_api_scope
@@ -1298,9 +1190,7 @@ def _main():
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
opts.maildir_path = maildir_api_config.get("maildir_path")
opts.maildir_create = bool(
maildir_api_config.getboolean("maildir_create", fallback=False)
)
opts.maildir_create = maildir_api_config.get("maildir_create")
if "log_analytics" in config.sections():
log_analytics_config = config["log_analytics"]
@@ -1337,50 +1227,6 @@ def _main():
logger.critical("mode setting missing from the gelf config section")
exit(-1)
if "google_secops" in config.sections():
google_secops_config = config["google_secops"]
opts.google_secops = True
if "include_ruf_payload" in google_secops_config:
opts.google_secops_include_ruf_payload = bool(
google_secops_config.getboolean("include_ruf_payload")
)
if "ruf_payload_max_bytes" in google_secops_config:
opts.google_secops_ruf_payload_max_bytes = google_secops_config.getint(
"ruf_payload_max_bytes"
)
if "static_observer_name" in google_secops_config:
opts.google_secops_static_observer_name = google_secops_config[
"static_observer_name"
]
if "static_observer_vendor" in google_secops_config:
opts.google_secops_static_observer_vendor = google_secops_config[
"static_observer_vendor"
]
if "static_environment" in google_secops_config:
opts.google_secops_static_environment = google_secops_config[
"static_environment"
]
if "api_credentials_file" in google_secops_config:
opts.google_secops_api_credentials_file = google_secops_config[
"api_credentials_file"
]
if "api_customer_id" in google_secops_config:
opts.google_secops_api_customer_id = google_secops_config[
"api_customer_id"
]
if "api_region" in google_secops_config:
opts.google_secops_api_region = google_secops_config[
"api_region"
]
if "api_log_type" in google_secops_config:
opts.google_secops_api_log_type = google_secops_config[
"api_log_type"
]
if "use_stdout" in google_secops_config:
opts.google_secops_use_stdout = google_secops_config.getboolean(
"use_stdout"
)
if "webhook" in config.sections():
webhook_config = config["webhook"]
if "aggregate_url" in webhook_config:
@@ -1439,11 +1285,6 @@ def _main():
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
@@ -1451,7 +1292,7 @@ def _main():
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
timeout=opts.elasticsearch_timeout,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
@@ -1476,11 +1317,6 @@ def _main():
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
@@ -1488,7 +1324,7 @@ def _main():
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
timeout=opts.opensearch_timeout,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
@@ -1559,23 +1395,6 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
if opts.google_secops:
try:
google_secops_client = google_secops.GoogleSecOpsClient(
include_ruf_payload=opts.google_secops_include_ruf_payload,
ruf_payload_max_bytes=opts.google_secops_ruf_payload_max_bytes,
static_observer_name=opts.google_secops_static_observer_name,
static_observer_vendor=opts.google_secops_static_observer_vendor,
static_environment=opts.google_secops_static_environment,
api_credentials_file=opts.google_secops_api_credentials_file,
api_customer_id=opts.google_secops_api_customer_id,
api_region=opts.google_secops_api_region,
api_log_type=opts.google_secops_api_log_type,
use_stdout=opts.google_secops_use_stdout,
)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
@@ -1614,23 +1433,16 @@ def _main():
results = []
pbar = None
if sys.stdout.isatty():
pbar = tqdm(total=len(file_paths))
n_procs = int(opts.n_procs or 1)
if n_procs < 1:
n_procs = 1
# Capture the current log level to pass to child processes
current_log_level = logger.level
current_log_file = opts.log_file
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
for batch_index in range(math.ceil(len(file_paths) / opts.n_procs)):
processes = []
connections = []
for proc_index in range(n_procs * batch_index, n_procs * (batch_index + 1)):
for proc_index in range(
opts.n_procs * batch_index, opts.n_procs * (batch_index + 1)
):
if proc_index >= len(file_paths):
break
@@ -1651,8 +1463,6 @@ def _main():
opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours,
child_conn,
current_log_level,
current_log_file,
),
)
processes.append(process)
@@ -1665,12 +1475,9 @@ def _main():
for proc in processes:
proc.join()
if pbar is not None:
if sys.stdout.isatty():
counter += 1
pbar.update(1)
if pbar is not None:
pbar.close()
pbar.update(counter - pbar.n)
for result in results:
if isinstance(result[0], ParserError) or result[0] is None:
@@ -1694,11 +1501,6 @@ def _main():
smtp_tls_reports.append(result[0]["report"])
for mbox_path in mbox_paths:
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
strip = opts.strip_attachment_payloads
reports = get_dmarc_reports_from_mbox(
mbox_path,
@@ -1710,17 +1512,13 @@ def _main():
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
mailbox_connection = None
mailbox_batch_size_value = 10
mailbox_check_timeout_value = 30
normalize_timespan_threshold_hours_value = 24.0
if opts.imap_host:
try:
if opts.imap_user is None or opts.imap_password is None:
@@ -1736,20 +1534,13 @@ def _main():
if not opts.imap_ssl:
ssl = False
imap_timeout = (
int(opts.imap_timeout) if opts.imap_timeout is not None else 30
)
imap_max_retries = (
int(opts.imap_max_retries) if opts.imap_max_retries is not None else 4
)
imap_port_value = int(opts.imap_port) if opts.imap_port is not None else 993
mailbox_connection = IMAPConnection(
host=opts.imap_host,
port=imap_port_value,
port=opts.imap_port,
ssl=ssl,
verify=verify,
timeout=imap_timeout,
max_retries=imap_max_retries,
timeout=opts.imap_timeout,
max_retries=opts.imap_max_retries,
user=opts.imap_user,
password=opts.imap_password,
)
@@ -1770,7 +1561,7 @@ def _main():
username=opts.graph_user,
password=opts.graph_password,
token_file=opts.graph_token_file,
allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage),
allow_unencrypted_storage=opts.graph_allow_unencrypted_storage,
graph_url=opts.graph_url,
)
@@ -1815,24 +1606,11 @@ def _main():
exit(1)
if mailbox_connection:
mailbox_batch_size_value = (
int(opts.mailbox_batch_size) if opts.mailbox_batch_size is not None else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
try:
reports = get_dmarc_reports_from_mailbox(
connection=mailbox_connection,
delete=opts.mailbox_delete,
batch_size=mailbox_batch_size_value,
batch_size=opts.mailbox_batch_size,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
ip_db_path=opts.ip_db_path,
@@ -1844,7 +1622,7 @@ def _main():
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
aggregate_reports += reports["aggregate_reports"]
@@ -1855,31 +1633,27 @@ def _main():
logger.exception("Mailbox Error")
exit(1)
parsing_results: ParsingResults = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
results = dict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
process_reports(parsing_results)
process_reports(results)
if opts.smtp_host:
try:
verify = True
if opts.smtp_skip_certificate_verification:
verify = False
smtp_port_value = int(opts.smtp_port) if opts.smtp_port is not None else 25
smtp_to_value = (
list(opts.smtp_to)
if isinstance(opts.smtp_to, list)
else _str_to_list(str(opts.smtp_to))
)
email_results(
parsing_results,
results,
opts.smtp_host,
opts.smtp_from,
smtp_to_value,
port=smtp_port_value,
opts.smtp_to,
port=opts.smtp_port,
verify=verify,
username=opts.smtp_user,
password=opts.smtp_password,
@@ -1901,17 +1675,17 @@ def _main():
archive_folder=opts.mailbox_archive_folder,
delete=opts.mailbox_delete,
test=opts.mailbox_test,
check_timeout=mailbox_check_timeout_value,
check_timeout=opts.mailbox_check_timeout,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=opts.strip_attachment_payloads,
batch_size=mailbox_batch_size_value,
batch_size=opts.mailbox_batch_size,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))

View File

@@ -1,3 +1,3 @@
__version__ = "9.0.9"
__version__ = "9.0.5"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -2,28 +2,29 @@
from __future__ import annotations
from typing import Any, Optional, Union
from typing import Optional, Union, Any
from elasticsearch.helpers import reindex
from elasticsearch_dsl.search import Q
from elasticsearch_dsl import (
Boolean,
Date,
connections,
Object,
Document,
Index,
Nested,
InnerDoc,
Integer,
Ip,
Nested,
Object,
Search,
Text,
connections,
Boolean,
Ip,
Date,
Search,
)
from elasticsearch_dsl.search import Q
from elasticsearch.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class ElasticsearchError(Exception):
@@ -92,17 +93,17 @@ class _AggregateReportDoc(Document):
spf_results = Nested(_SPFResult)
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue]
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
self.dkim_results.append(
_DKIMResult(domain=domain, selector=selector, result=result)
) # pyright: ignore[reportCallIssue]
)
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) # pyright: ignore[reportCallIssue]
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
def save(self, **kwargs):
self.passed_dmarc = False
self.passed_dmarc = self.spf_aligned or self.dkim_aligned
@@ -136,25 +137,25 @@ class _ForensicSampleDoc(InnerDoc):
attachments = Nested(_EmailAttachmentDoc)
def add_to(self, display_name: str, address: str):
self.to.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
self.to.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_reply_to(self, display_name: str, address: str):
self.reply_to.append(
_EmailAddressDoc(display_name=display_name, address=address)
) # pyright: ignore[reportCallIssue]
)
def add_cc(self, display_name: str, address: str):
self.cc.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
self.cc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_bcc(self, display_name: str, address: str):
self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_attachment(self, filename: str, content_type: str, sha256: str):
self.attachments.append(
_EmailAttachmentDoc(
filename=filename, content_type=content_type, sha256=sha256
)
) # pyright: ignore[reportCallIssue]
)
class _ForensicReportDoc(Document):
@@ -222,7 +223,7 @@ class _SMTPTLSPolicyDoc(InnerDoc):
additional_information=additional_information_uri,
failure_reason_code=failure_reason_code,
)
self.failure_details.append(_details) # pyright: ignore[reportCallIssue]
self.failure_details.append(_details)
class _SMTPTLSReportDoc(Document):
@@ -256,7 +257,7 @@ class _SMTPTLSReportDoc(Document):
policy_string=policy_string,
mx_host_patterns=mx_host_patterns,
failure_details=failure_details,
) # pyright: ignore[reportCallIssue]
)
class AlreadySaved(ValueError):
@@ -266,18 +267,18 @@ class AlreadySaved(ValueError):
def set_hosts(
hosts: Union[str, list[str]],
*,
use_ssl: bool = False,
use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
timeout: float = 60.0,
timeout: Optional[float] = 60.0,
):
"""
Sets the Elasticsearch hosts to use
Args:
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
hosts (Union[str, list[str]]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
username (str): The username to use for authentication
@@ -367,7 +368,7 @@ def migrate_indexes(
}
Index(new_index_name).create()
Index(new_index_name).put_mapping(doc_type=doc, body=body)
reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType]
reindex(connections.get_connection(), aggregate_index_name, new_index_name)
Index(aggregate_index_name).delete()
for forensic_index in forensic_indexes:
@@ -379,8 +380,8 @@ def save_aggregate_report_to_elasticsearch(
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
number_of_shards: Optional[int] = 1,
number_of_replicas: Optional[int] = 0,
):
"""
Saves a parsed DMARC aggregate report to Elasticsearch
@@ -410,11 +411,11 @@ def save_aggregate_report_to_elasticsearch(
else:
index_date = begin_date.strftime("%Y-%m-%d")
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # type: ignore
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
@@ -426,12 +427,13 @@ def save_aggregate_report_to_elasticsearch(
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
try:
existing = search.execute()
except Exception as error_:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise ElasticsearchError(
"Elasticsearch's search for existing report \
error: {}".format(error_.__str__())
@@ -527,7 +529,7 @@ def save_aggregate_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
agg_doc.meta.index = index # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue]
agg_doc.meta.index = index
try:
agg_doc.save()
@@ -567,7 +569,7 @@ def save_forensic_report_to_elasticsearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
headers = dict()
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -581,7 +583,7 @@ def save_forensic_report_to_elasticsearch(
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) # pyright: ignore[reportArgumentType]
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
from_ = None
to_ = None
@@ -596,7 +598,7 @@ def save_forensic_report_to_elasticsearch(
from_ = dict()
from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_)) # pyright: ignore[reportArgumentType]
from_query = Q(dict(match_phrase=from_))
q = q & from_query
if "to" in headers:
# We convert the TO header from a string list to a flat string.
@@ -608,12 +610,12 @@ def save_forensic_report_to_elasticsearch(
to_ = dict()
to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_)) # pyright: ignore[reportArgumentType]
to_query = Q(dict(match_phrase=to_))
q = q & to_query
if "subject" in headers:
subject = headers["subject"]
subject_query = {"match_phrase": {"sample.headers.subject": subject}}
q = q & Q(subject_query) # pyright: ignore[reportArgumentType]
q = q & Q(subject_query)
search.query = q
existing = search.execute()
@@ -691,7 +693,7 @@ def save_forensic_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
forensic_doc.meta.index = index
try:
forensic_doc.save()
except Exception as e:
@@ -706,9 +708,9 @@ def save_smtp_tls_report_to_elasticsearch(
report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
monthly_indexes: Optional[bool] = False,
number_of_shards: Optional[int] = 1,
number_of_replicas: Optional[int] = 0,
):
"""
Saves a parsed SMTP TLS report to Elasticsearch
@@ -738,10 +740,10 @@ def save_smtp_tls_report_to_elasticsearch(
report["begin_date"] = begin_date
report["end_date"] = end_date
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # pyright: ignore[reportArgumentType]
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "smtp_tls_{0}*".format(index_suffix)
@@ -842,10 +844,10 @@ def save_smtp_tls_report_to_elasticsearch(
additional_information_uri=additional_information_uri,
failure_reason_code=failure_reason_code,
)
smtp_tls_doc.policies.append(policy_doc) # pyright: ignore[reportCallIssue]
smtp_tls_doc.policies.append(policy_doc)
create_indexes([index], index_settings)
smtp_tls_doc.meta.index = index # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue]
smtp_tls_doc.meta.index = index
try:
smtp_tls_doc.save()

View File

@@ -2,18 +2,20 @@
from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
import logging
import logging.handlers
import json
import threading
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler
log_context_data = threading.local()
@@ -50,7 +52,9 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_gelf(
self, aggregate_reports: list[dict[str, Any]]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -58,14 +62,14 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_gelf(
self, forensic_reports: list[dict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc smtptls report")
self.logger.info(json.dumps(row))

View File

@@ -1,709 +0,0 @@
# -*- coding: utf-8 -*-
"""Google SecOps (Chronicle) output module for parsedmarc"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Optional
import requests
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
class GoogleSecOpsError(RuntimeError):
"""Raised when a Google SecOps API error occurs"""
class GoogleSecOpsClient:
"""A client for Google SecOps (Chronicle) UDM output"""
def __init__(
self,
include_ruf_payload: bool = False,
ruf_payload_max_bytes: int = 4096,
static_observer_name: Optional[str] = None,
static_observer_vendor: str = "parsedmarc",
static_environment: Optional[str] = None,
api_credentials_file: Optional[str] = None,
api_customer_id: Optional[str] = None,
api_region: str = "us",
api_log_type: str = "DMARC",
use_stdout: bool = False,
):
"""
Initializes the GoogleSecOpsClient
Args:
include_ruf_payload: Include RUF message payload in output
ruf_payload_max_bytes: Maximum bytes of RUF payload to include
static_observer_name: Static observer name for telemetry
static_observer_vendor: Static observer vendor (default: parsedmarc)
static_environment: Static environment (prod/dev/custom string)
api_credentials_file: Path to Google service account JSON credentials
api_customer_id: Chronicle customer ID (required for API)
api_region: Chronicle region (us, europe, asia-southeast1, etc.)
api_log_type: Log type for Chronicle ingestion (default: DMARC)
use_stdout: Output to stdout instead of API (default: False)
"""
self.include_ruf_payload = include_ruf_payload
self.ruf_payload_max_bytes = ruf_payload_max_bytes
self.static_observer_name = static_observer_name
self.static_observer_vendor = static_observer_vendor
self.static_environment = static_environment
self.use_stdout = use_stdout
# API configuration
self.api_credentials_file = api_credentials_file
self.api_customer_id = api_customer_id
self.api_region = api_region
self.api_log_type = api_log_type
self.credentials = None
self.session = None
# Initialize API client if not using stdout
if not self.use_stdout:
if not self.api_credentials_file or not self.api_customer_id:
raise GoogleSecOpsError(
"api_credentials_file and api_customer_id are required when not using stdout. "
"Set use_stdout=True to output to stdout instead."
)
self._initialize_api_client()
def _initialize_api_client(self):
"""Initialize the Chronicle API client with authentication"""
try:
logger.debug("Initializing Chronicle API client")
# Load service account credentials
self.credentials = service_account.Credentials.from_service_account_file(
self.api_credentials_file,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
# Create session with authentication
self.session = requests.Session()
self.session.headers.update({"User-Agent": USER_AGENT})
logger.info("Chronicle API client initialized successfully")
except Exception as e:
raise GoogleSecOpsError(f"Failed to initialize Chronicle API client: {e}")
def _get_api_endpoint(self) -> str:
"""Get the Chronicle Ingestion API endpoint based on region"""
return f"https://{self.api_region}-chronicle.googleapis.com/v1alpha/projects/{self.api_customer_id}/locations/{self.api_region}/instances/default/logTypes/{self.api_log_type}/logs:import"
def _send_events_to_api(self, events: list[str]) -> None:
"""
Send UDM events to Chronicle Ingestion API
Args:
events: List of NDJSON event strings
"""
if not events:
return
try:
# Refresh credentials if needed
if not self.credentials.valid:
self.credentials.refresh(Request())
# Prepare request
endpoint = self._get_api_endpoint()
headers = {
"Authorization": f"Bearer {self.credentials.token}",
"Content-Type": "application/json",
}
# Chronicle expects events in inline_source format
# Each event should be a separate log entry
payload = {
"inline_source": {
"logs": [json.loads(event) for event in events]
}
}
logger.debug(f"Sending {len(events)} events to Chronicle API")
response = self.session.post(
endpoint,
headers=headers,
json=payload,
timeout=60,
)
if response.status_code == 200:
logger.info(f"Successfully sent {len(events)} events to Chronicle")
else:
error_msg = f"Chronicle API error: {response.status_code} - {response.text}"
logger.error(error_msg)
raise GoogleSecOpsError(error_msg)
except GoogleSecOpsError:
raise
except Exception as e:
raise GoogleSecOpsError(f"Failed to send events to Chronicle API: {e}")
def _output_events(self, events: list[str]) -> None:
"""
Output events either to stdout or Chronicle API
Args:
events: List of NDJSON event strings
"""
if self.use_stdout:
# Output to stdout for collection by external log shippers
for event in events:
print(event)
else:
# Send directly to Chronicle API
self._send_events_to_api(events)
def _get_severity(self, disposition: str, spf_aligned: bool, dkim_aligned: bool) -> str:
"""
Derive severity from DMARC disposition and alignment
Args:
disposition: DMARC policy disposition
spf_aligned: Whether SPF is aligned
dkim_aligned: Whether DKIM is aligned
Returns:
Severity level: HIGH, MEDIUM, or LOW
"""
if disposition == "reject":
return "HIGH"
elif disposition == "quarantine" and not (spf_aligned or dkim_aligned):
return "MEDIUM"
else:
return "LOW"
def _get_description(
self,
dmarc_pass: bool,
spf_result: Optional[str],
dkim_result: Optional[str],
spf_aligned: bool,
dkim_aligned: bool,
disposition: str,
) -> str:
"""
Generate description for the event
Args:
dmarc_pass: Whether DMARC passed
spf_result: SPF result
dkim_result: DKIM result
spf_aligned: Whether SPF is aligned
dkim_aligned: Whether DKIM is aligned
disposition: DMARC disposition
Returns:
Human-readable description
"""
parts = []
if dmarc_pass:
parts.append("DMARC pass")
else:
parts.append("DMARC fail")
if spf_result:
parts.append(f"SPF={spf_result}")
if dkim_result:
parts.append(f"DKIM={dkim_result}")
if spf_aligned:
parts.append("SPF aligned")
elif spf_result:
parts.append("SPF not aligned")
if dkim_aligned:
parts.append("DKIM aligned")
elif dkim_result:
parts.append("DKIM not aligned")
parts.append(f"disposition={disposition}")
return "; ".join(parts)
def _format_timestamp(self, timestamp_str: str) -> str:
"""
Convert parsedmarc timestamp to RFC 3339 format
Args:
timestamp_str: Timestamp string from parsedmarc
Returns:
RFC 3339 formatted timestamp
"""
try:
dt = human_timestamp_to_datetime(timestamp_str)
# Ensure timezone-aware datetime
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
except Exception:
# Fallback to current time if parsing fails
return datetime.now(timezone.utc).isoformat()
def save_aggregate_report_to_google_secops(
self, aggregate_report: dict[str, Any]
) -> list[str]:
"""
Convert aggregate DMARC report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
aggregate_report: Aggregate report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting aggregate report to Google SecOps UDM format")
events = []
try:
report_metadata = aggregate_report["report_metadata"]
policy_published = aggregate_report["policy_published"]
for record in aggregate_report["records"]:
# Extract values
source_ip = record["source"]["ip_address"]
source_country = record["source"].get("country")
source_reverse_dns = record["source"].get("reverse_dns")
source_base_domain = record["source"].get("base_domain")
source_name = record["source"].get("name")
source_type = record["source"].get("type")
header_from = record["identifiers"]["header_from"]
envelope_from = record["identifiers"]["envelope_from"]
disposition = record["policy_evaluated"]["disposition"]
spf_aligned = record["alignment"]["spf"]
dkim_aligned = record["alignment"]["dkim"]
dmarc_pass = record["alignment"]["dmarc"]
count = record["count"]
interval_begin = record["interval_begin"]
# Get auth results
spf_results = record["auth_results"].get("spf", [])
dkim_results = record["auth_results"].get("dkim", [])
spf_result = spf_results[0]["result"] if spf_results else None
dkim_result = dkim_results[0]["result"] if dkim_results else None
# Build UDM event
event: dict[str, Any] = {
"event_type": "DMARC_AGGREGATE",
"metadata": {
"event_timestamp": self._format_timestamp(interval_begin),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"principal": {
"ip": [source_ip],
},
"target": {
"domain": {
"name": header_from,
}
},
"security_result": [
{
"severity": self._get_severity(
disposition, spf_aligned, dkim_aligned
),
"description": self._get_description(
dmarc_pass,
spf_result,
dkim_result,
spf_aligned,
dkim_aligned,
disposition,
),
"detection_fields": [
{"key": "dmarc.disposition", "value": disposition},
{"key": "dmarc.policy", "value": policy_published["p"]},
{"key": "dmarc.pass", "value": dmarc_pass},
{"key": "dmarc.spf_aligned", "value": spf_aligned},
{"key": "dmarc.dkim_aligned", "value": dkim_aligned},
{"key": "dmarc.header_from", "value": header_from},
{"key": "dmarc.envelope_from", "value": envelope_from},
{"key": "dmarc.report_org", "value": report_metadata["org_name"]},
{"key": "dmarc.report_id", "value": report_metadata["report_id"]},
{"key": "dmarc.report_begin", "value": report_metadata["begin_date"]},
{"key": "dmarc.report_end", "value": report_metadata["end_date"]},
{"key": "dmarc.row_count", "value": count},
],
}
],
}
# Add optional fields to detection_fields
if spf_result:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.spf_result", "value": spf_result}
)
if dkim_result:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.dkim_result", "value": dkim_result}
)
if source_name:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_name", "value": source_name}
)
if source_type:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_type", "value": source_type}
)
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if source_base_domain:
additional_context.append(
{"key": "source_base_domain", "value": source_base_domain}
)
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Add SPF auth results to context
if spf_results:
for idx, spf in enumerate(spf_results):
additional_context.append(
{"key": f"spf_{idx}_domain", "value": spf.get("domain", "")}
)
additional_context.append(
{"key": f"spf_{idx}_result", "value": spf.get("result", "")}
)
# Add DKIM auth results to context
if dkim_results:
for idx, dkim in enumerate(dkim_results):
additional_context.append(
{"key": f"dkim_{idx}_domain", "value": dkim.get("domain", "")}
)
additional_context.append(
{"key": f"dkim_{idx}_result", "value": dkim.get("result", "")}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if source_country:
event["principal"]["location"] = {"country_or_region": source_country}
if source_reverse_dns:
event["principal"]["hostname"] = source_reverse_dns
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting aggregate report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse DMARC aggregate report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []
def save_forensic_report_to_google_secops(
self, forensic_report: dict[str, Any]
) -> list[str]:
"""
Convert forensic DMARC report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
forensic_report: Forensic report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting forensic report to Google SecOps UDM format")
events = []
try:
source_ip = forensic_report["source"]["ip_address"]
source_country = forensic_report["source"].get("country")
source_reverse_dns = forensic_report["source"].get("reverse_dns")
source_base_domain = forensic_report["source"].get("base_domain")
source_name = forensic_report["source"].get("name")
source_type = forensic_report["source"].get("type")
reported_domain = forensic_report["reported_domain"]
arrival_date = forensic_report["arrival_date_utc"]
auth_failure = forensic_report.get("auth_failure", [])
# Determine severity - forensic reports indicate failures
# Default to MEDIUM for authentication failures
severity = "MEDIUM"
# Build description
auth_failure_str = ", ".join(auth_failure) if auth_failure else "unknown"
description = f"DMARC forensic report: authentication failure ({auth_failure_str})"
# Build UDM event
event: dict[str, Any] = {
"event_type": "DMARC_FORENSIC",
"metadata": {
"event_timestamp": self._format_timestamp(arrival_date),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"principal": {
"ip": [source_ip],
},
"target": {
"domain": {
"name": reported_domain,
}
},
"security_result": [
{
"severity": severity,
"description": description,
"detection_fields": [
{"key": "dmarc.auth_failure", "value": auth_failure_str},
{"key": "dmarc.reported_domain", "value": reported_domain},
],
}
],
}
# Add optional fields to detection_fields (matching aggregate report field names)
if source_name:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_name", "value": source_name}
)
if source_type:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_type", "value": source_type}
)
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if source_base_domain:
additional_context.append(
{"key": "source_base_domain", "value": source_base_domain}
)
if forensic_report.get("feedback_type"):
additional_context.append(
{"key": "feedback_type", "value": forensic_report["feedback_type"]}
)
if forensic_report.get("message_id"):
additional_context.append(
{"key": "message_id", "value": forensic_report["message_id"]}
)
if forensic_report.get("authentication_results"):
additional_context.append(
{"key": "authentication_results", "value": forensic_report["authentication_results"]}
)
if forensic_report.get("delivery_result"):
additional_context.append(
{"key": "delivery_result", "value": forensic_report["delivery_result"]}
)
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Add payload excerpt if enabled
if self.include_ruf_payload and forensic_report.get("sample"):
sample = forensic_report["sample"]
if len(sample) > self.ruf_payload_max_bytes:
sample = sample[:self.ruf_payload_max_bytes] + "... [truncated]"
additional_context.append(
{"key": "message_sample", "value": sample}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if source_country:
event["principal"]["location"] = {"country_or_region": source_country}
if source_reverse_dns:
event["principal"]["hostname"] = source_reverse_dns
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting forensic report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse DMARC forensic report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []
def save_smtp_tls_report_to_google_secops(
self, smtp_tls_report: dict[str, Any]
) -> list[str]:
"""
Convert SMTP TLS report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
smtp_tls_report: SMTP TLS report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting SMTP TLS report to Google SecOps UDM format")
events = []
try:
organization_name = smtp_tls_report.get("organization_name", "")
begin_date = smtp_tls_report["begin_date"]
end_date = smtp_tls_report["end_date"]
for policy in smtp_tls_report.get("policies", []):
policy_domain = policy["policy_domain"]
for failure in policy.get("failure_details", []):
# Build UDM event for each failure
event: dict[str, Any] = {
"event_type": "SMTP_TLS_REPORT",
"metadata": {
"event_timestamp": self._format_timestamp(begin_date),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"target": {
"domain": {
"name": policy_domain,
}
},
"security_result": [
{
"severity": "LOW",
"description": f"SMTP TLS failure: {failure.get('result_type', 'unknown')}",
"detection_fields": [
{"key": "smtp_tls.policy_domain", "value": policy_domain},
{"key": "smtp_tls.result_type", "value": failure.get("result_type", "")},
{"key": "smtp_tls.failed_session_count", "value": failure.get("failed_session_count", 0)},
{"key": "smtp_tls.report_org", "value": organization_name},
{"key": "smtp_tls.report_begin", "value": begin_date},
{"key": "smtp_tls.report_end", "value": end_date},
],
}
],
}
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if failure.get("sending_mta_ip"):
event["principal"] = {"ip": [failure["sending_mta_ip"]]}
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting SMTP TLS report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse SMTP TLS report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []

View File

@@ -2,16 +2,18 @@
from __future__ import annotations
import json
from ssl import SSLContext, create_default_context
from typing import Any, Optional, Union
from ssl import SSLContext
import json
from ssl import create_default_context
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import __version__
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
class KafkaError(RuntimeError):
@@ -46,7 +48,7 @@ class KafkaClient(object):
``$ConnectionString``, and the password is the
Azure Event Hub connection string.
"""
config: dict[str, Any] = dict(
config = dict(
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
bootstrap_servers=kafka_hosts,
client_id="parsedmarc-{0}".format(__version__),

View File

@@ -4,12 +4,11 @@ from __future__ import annotations
from typing import Any
from parsedmarc.log import logger
from azure.core.exceptions import HttpResponseError
from azure.identity import ClientSecretCredential
from azure.monitor.ingestion import LogsIngestionClient
from parsedmarc.log import logger
class LogAnalyticsException(Exception):
"""Raised when an Elasticsearch error occurs"""
@@ -133,7 +132,7 @@ class LogAnalyticsClient(object):
def publish_results(
self,
results: dict[str, Any],
results: dict[str, dict[str, Any]],
save_aggregate: bool,
save_forensic: bool,
save_smtp_tls: bool,

View File

@@ -116,14 +116,14 @@ class GmailConnection(MailboxConnection):
else:
return [id for id in self._fetch_all_message_ids(reports_label_id)]
def fetch_message(self, message_id) -> str:
def fetch_message(self, message_id):
msg = (
self.service.users()
.messages()
.get(userId="me", id=message_id, format="raw")
.execute()
)
return urlsafe_b64decode(msg["raw"]).decode(errors="replace")
return urlsafe_b64decode(msg["raw"])
def delete_message(self, message_id: str):
self.service.users().messages().delete(userId="me", id=message_id)

View File

@@ -6,7 +6,7 @@ from enum import Enum
from functools import lru_cache
from pathlib import Path
from time import sleep
from typing import Any, List, Optional, Union
from typing import List, Optional
from azure.identity import (
UsernamePasswordCredential,
@@ -28,7 +28,7 @@ class AuthMethod(Enum):
def _get_cache_args(token_path: Path, allow_unencrypted_storage):
cache_args: dict[str, Any] = {
cache_args = {
"cache_persistence_options": TokenCachePersistenceOptions(
name="parsedmarc", allow_unencrypted_storage=allow_unencrypted_storage
)
@@ -151,9 +151,9 @@ class MSGraphConnection(MailboxConnection):
else:
logger.warning(f"Unknown response {resp.status_code} {resp.json()}")
def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
def fetch_messages(self, folder_name: str, **kwargs) -> List[str]:
"""Returns a list of message UIDs in the specified folder"""
folder_id = self._find_folder_id_from_folder_path(reports_folder)
folder_id = self._find_folder_id_from_folder_path(folder_name)
url = f"/users/{self.mailbox_name}/mailFolders/{folder_id}/messages"
since = kwargs.get("since")
if not since:
@@ -166,7 +166,7 @@ class MSGraphConnection(MailboxConnection):
def _get_all_messages(self, url, batch_size, since):
messages: list
params: dict[str, Union[str, int]] = {"$select": "id"}
params = {"$select": "id"}
if since:
params["$filter"] = f"receivedDateTime ge {since}"
if batch_size and batch_size > 0:

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from typing import cast
from typing import Optional
from time import sleep
@@ -17,14 +17,15 @@ from parsedmarc.mail.mailbox_connection import MailboxConnection
class IMAPConnection(MailboxConnection):
def __init__(
self,
host: str,
user: str,
password: str,
port: int = 993,
ssl: bool = True,
verify: bool = True,
timeout: int = 30,
max_retries: int = 4,
host: Optional[str] = None,
*,
user: Optional[str] = None,
password: Optional[str] = None,
port: Optional[str] = None,
ssl: Optional[bool] = True,
verify: Optional[bool] = True,
timeout: Optional[int] = 30,
max_retries: Optional[int] = 4,
):
self._username = user
self._password = password
@@ -46,13 +47,13 @@ class IMAPConnection(MailboxConnection):
def fetch_messages(self, reports_folder: str, **kwargs):
self._client.select_folder(reports_folder)
since = kwargs.get("since")
if since is not None:
return self._client.search(f"SINCE {since}")
if since:
return self._client.search(["SINCE", since])
else:
return self._client.search()
def fetch_message(self, message_id: int):
return cast(str, self._client.fetch_message(message_id, parse=False))
return self._client.fetch_message(message_id, parse=False)
def delete_message(self, message_id: int):
self._client.delete_messages([message_id])

View File

@@ -13,16 +13,16 @@ class MailboxConnection(ABC):
def create_folder(self, folder_name: str):
raise NotImplementedError
def fetch_messages(self, reports_folder: str, **kwargs):
def fetch_messages(self, reports_folder: str, **kwargs) -> list[str]:
raise NotImplementedError
def fetch_message(self, message_id) -> str:
raise NotImplementedError
def delete_message(self, message_id):
def delete_message(self, message_id: str):
raise NotImplementedError
def move_message(self, message_id, folder_name: str):
def move_message(self, message_id: str, folder_name: str):
raise NotImplementedError
def keepalive(self):

View File

@@ -2,20 +2,21 @@
from __future__ import annotations
import mailbox
import os
from typing import Optional
from time import sleep
from typing import Dict
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
import mailbox
import os
class MaildirConnection(MailboxConnection):
def __init__(
self,
maildir_path: str,
maildir_create: bool = False,
maildir_path: Optional[bool] = None,
maildir_create: Optional[bool] = False,
):
self._maildir_path = maildir_path
self._maildir_create = maildir_create
@@ -32,31 +33,27 @@ class MaildirConnection(MailboxConnection):
)
raise Exception(ex)
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
self._subfolder_client = {}
def create_folder(self, folder_name: str):
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
self._client.add_folder(folder_name)
def fetch_messages(self, reports_folder: str, **kwargs):
return self._client.keys()
def fetch_message(self, message_id: str) -> str:
msg = self._client.get(message_id)
if msg is not None:
msg = msg.as_string()
if msg is not None:
return msg
return ""
def fetch_message(self, message_id: str):
return self._client.get(message_id).as_string()
def delete_message(self, message_id: str):
self._client.remove(message_id)
def move_message(self, message_id: str, folder_name: str):
message_data = self._client.get(message_id)
if message_data is None:
return
if folder_name not in self._subfolder_client:
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
if folder_name not in self._subfolder_client.keys():
self._subfolder_client = mailbox.Maildir(
os.join(self.maildir_path, folder_name), create=self.maildir_create
)
self._subfolder_client[folder_name].add(message_data)
self._client.remove(message_id)

View File

@@ -2,28 +2,29 @@
from __future__ import annotations
from typing import Any, Optional, Union
from typing import Optional, Union, Any
from opensearchpy import (
Boolean,
Date,
Q,
connections,
Object,
Document,
Index,
Nested,
InnerDoc,
Integer,
Ip,
Nested,
Object,
Q,
Search,
Text,
connections,
Boolean,
Ip,
Date,
Search,
)
from opensearchpy.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class OpenSearchError(Exception):
@@ -102,7 +103,7 @@ class _AggregateReportDoc(Document):
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
def save(self, **kwargs):
self.passed_dmarc = False
self.passed_dmarc = self.spf_aligned or self.dkim_aligned
@@ -378,9 +379,9 @@ def save_aggregate_report_to_opensearch(
aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
monthly_indexes: Optional[bool] = False,
number_of_shards: Optional[int] = 1,
number_of_replicas: Optional[int] = 0,
):
"""
Saves a parsed DMARC aggregate report to OpenSearch
@@ -426,12 +427,13 @@ def save_aggregate_report_to_opensearch(
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
try:
existing = search.execute()
except Exception as error_:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise OpenSearchError(
"OpenSearch's search for existing report \
error: {}".format(error_.__str__())
@@ -539,7 +541,7 @@ def save_forensic_report_to_opensearch(
forensic_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
):
@@ -567,7 +569,7 @@ def save_forensic_report_to_opensearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
headers = dict()
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -706,9 +708,9 @@ def save_smtp_tls_report_to_opensearch(
report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
monthly_indexes: Optional[bool] = False,
number_of_shards: Optional[int] = 1,
number_of_replicas: Optional[int] = 0,
):
"""
Saves a parsed SMTP TLS report to OpenSearch

View File

@@ -2,9 +2,9 @@
from __future__ import annotations
import json
from typing import Any
import json
import boto3
from parsedmarc.log import logger
@@ -51,7 +51,7 @@ class S3Client(object):
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
)
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore
self.bucket: Any = self.s3.Bucket(self.bucket_name)
def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate")

View File

@@ -2,13 +2,15 @@
from __future__ import annotations
import json
import socket
from typing import Any, Union
from urllib.parse import urlparse
import requests
from urllib.parse import urlparse
import socket
import json
import urllib3
import requests
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger

View File

@@ -3,11 +3,14 @@
from __future__ import annotations
import json
import logging
import logging.handlers
from typing import Any
import json
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
@@ -32,17 +35,23 @@ class SyslogClient(object):
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
self.logger.addHandler(log_handler)
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_syslog(
self, aggregate_reports: list[dict[str, Any]]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_syslog(
self, forensic_reports: list[dict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_syslog(self, smtp_tls_reports: list[dict[str, Any]]):
def save_smtp_tls_report_to_syslog(
self, smtp_tls_reports: list[dict[str, Any]]
):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))

View File

@@ -1,220 +0,0 @@
from __future__ import annotations
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# NOTE: This module is intentionally Python 3.9 compatible.
# - No PEP 604 unions (A | B)
# - No typing.NotRequired / Required (3.11+) to avoid an extra dependency.
# For optional keys, use total=False TypedDicts.
ReportType = Literal["aggregate", "forensic", "smtp_tls"]
class AggregateReportMetadata(TypedDict):
org_name: str
org_email: str
org_extra_contact_info: Optional[str]
report_id: str
begin_date: str
end_date: str
timespan_requires_normalization: bool
original_timespan_seconds: int
errors: List[str]
class AggregatePolicyPublished(TypedDict):
domain: str
adkim: str
aspf: str
p: str
sp: str
pct: str
fo: str
class IPSourceInfo(TypedDict):
ip_address: str
country: Optional[str]
reverse_dns: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
class AggregateAlignment(TypedDict):
spf: bool
dkim: bool
dmarc: bool
class AggregateIdentifiers(TypedDict):
header_from: str
envelope_from: Optional[str]
envelope_to: Optional[str]
class AggregatePolicyOverrideReason(TypedDict):
type: Optional[str]
comment: Optional[str]
class AggregateAuthResultDKIM(TypedDict):
domain: str
result: str
selector: str
class AggregateAuthResultSPF(TypedDict):
domain: str
result: str
scope: str
class AggregateAuthResults(TypedDict):
dkim: List[AggregateAuthResultDKIM]
spf: List[AggregateAuthResultSPF]
class AggregatePolicyEvaluated(TypedDict):
disposition: str
dkim: str
spf: str
policy_override_reasons: List[AggregatePolicyOverrideReason]
class AggregateRecord(TypedDict):
interval_begin: str
interval_end: str
source: IPSourceInfo
count: int
alignment: AggregateAlignment
policy_evaluated: AggregatePolicyEvaluated
disposition: str
identifiers: AggregateIdentifiers
auth_results: AggregateAuthResults
class AggregateReport(TypedDict):
xml_schema: str
report_metadata: AggregateReportMetadata
policy_published: AggregatePolicyPublished
records: List[AggregateRecord]
class EmailAddress(TypedDict):
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
class EmailAttachment(TypedDict, total=False):
filename: Optional[str]
mail_content_type: Optional[str]
sha256: Optional[str]
ParsedEmail = TypedDict(
"ParsedEmail",
{
# This is a lightly-specified version of mailsuite/mailparser JSON.
# It focuses on the fields parsedmarc uses in forensic handling.
"headers": Dict[str, Any],
"subject": Optional[str],
"filename_safe_subject": Optional[str],
"date": Optional[str],
"from": EmailAddress,
"to": List[EmailAddress],
"cc": List[EmailAddress],
"bcc": List[EmailAddress],
"attachments": List[EmailAttachment],
"body": Optional[str],
"has_defects": bool,
"defects": Any,
"defects_categories": Any,
},
total=False,
)
class ForensicReport(TypedDict):
feedback_type: Optional[str]
user_agent: Optional[str]
version: Optional[str]
original_envelope_id: Optional[str]
original_mail_from: Optional[str]
original_rcpt_to: Optional[str]
arrival_date: str
arrival_date_utc: str
authentication_results: Optional[str]
delivery_result: Optional[str]
auth_failure: List[str]
authentication_mechanisms: List[str]
dkim_domain: Optional[str]
reported_domain: str
sample_headers_only: bool
source: IPSourceInfo
sample: str
parsed_sample: ParsedEmail
class SMTPTLSFailureDetails(TypedDict):
result_type: str
failed_session_count: int
class SMTPTLSFailureDetailsOptional(SMTPTLSFailureDetails, total=False):
sending_mta_ip: str
receiving_ip: str
receiving_mx_hostname: str
receiving_mx_helo: str
additional_info_uri: str
failure_reason_code: str
ip_address: str
class SMTPTLSPolicySummary(TypedDict):
policy_domain: str
policy_type: str
successful_session_count: int
failed_session_count: int
class SMTPTLSPolicy(SMTPTLSPolicySummary, total=False):
policy_strings: List[str]
mx_host_patterns: List[str]
failure_details: List[SMTPTLSFailureDetailsOptional]
class SMTPTLSReport(TypedDict):
organization_name: str
begin_date: str
end_date: str
contact_info: Union[str, List[str]]
report_id: str
policies: List[SMTPTLSPolicy]
class AggregateParsedReport(TypedDict):
report_type: Literal["aggregate"]
report: AggregateReport
class ForensicParsedReport(TypedDict):
report_type: Literal["forensic"]
report: ForensicReport
class SMTPTLSParsedReport(TypedDict):
report_type: Literal["smtp_tls"]
report: SMTPTLSReport
ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport]
class ParsingResults(TypedDict):
aggregate_reports: List[AggregateReport]
forensic_reports: List[ForensicReport]
smtp_tls_reports: List[SMTPTLSReport]

View File

@@ -4,23 +4,25 @@
from __future__ import annotations
import base64
import csv
import hashlib
import io
import json
import logging
import mailbox
import os
import re
import shutil
import subprocess
import tempfile
from datetime import datetime, timedelta, timezone
from typing import Optional, TypedDict, Union, cast
from typing import Optional, Union, TypedDict, Any
import mailparser
import logging
import os
from datetime import datetime
from datetime import timezone
from datetime import timedelta
from expiringdict import ExpiringDict
import tempfile
import subprocess
import shutil
import mailparser
import json
import hashlib
import base64
import mailbox
import re
import csv
import io
try:
from importlib.resources import files
@@ -29,19 +31,19 @@ except ImportError:
from importlib.resources import files
import dns.exception
import dns.resolver
from dateutil.parser import parse as parse_date
import dns.reversename
import dns.resolver
import dns.exception
import geoip2.database
import geoip2.errors
import publicsuffixlist
import requests
from dateutil.parser import parse as parse_date
from parsedmarc.log import logger
import parsedmarc.resources.dbip
import parsedmarc.resources.maps
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
@@ -64,21 +66,12 @@ class DownloadError(RuntimeError):
"""Raised when an error occurs when downloading a file"""
class ReverseDNSService(TypedDict):
name: str
type: Optional[str]
ReverseDNSMap = dict[str, ReverseDNSService]
class IPAddressInfo(TypedDict):
ip_address: str
reverse_dns: Optional[str]
country: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
class EmailAddress(TypedDict):
"""Parsed email address information"""
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
def decode_base64(data: str) -> bytes:
@@ -92,14 +85,14 @@ def decode_base64(data: str) -> bytes:
bytes: The decoded bytes
"""
data_bytes = bytes(data, encoding="ascii")
missing_padding = len(data_bytes) % 4
data = bytes(data, encoding="ascii")
missing_padding = len(data) % 4
if missing_padding != 0:
data_bytes += b"=" * (4 - missing_padding)
return base64.b64decode(data_bytes)
data += b"=" * (4 - missing_padding)
return base64.b64decode(data)
def get_base_domain(domain: str) -> Optional[str]:
def get_base_domain(domain: str) -> str:
"""
Gets the base domain name for the given domain
@@ -128,8 +121,8 @@ def query_dns(
record_type: str,
*,
cache: Optional[ExpiringDict] = None,
nameservers: Optional[list[str]] = None,
timeout: float = 2.0,
nameservers: list[str] = None,
timeout: int = 2.0,
) -> list[str]:
"""
Queries DNS
@@ -149,9 +142,9 @@ def query_dns(
record_type = record_type.upper()
cache_key = "{0}_{1}".format(domain, record_type)
if cache:
cached_records = cache.get(cache_key, None)
if isinstance(cached_records, list):
return cast(list[str], cached_records)
records = cache.get(cache_key, None)
if records:
return records
resolver = dns.resolver.Resolver()
timeout = float(timeout)
@@ -165,12 +158,26 @@ def query_dns(
resolver.nameservers = nameservers
resolver.timeout = timeout
resolver.lifetime = timeout
records = list(
map(
lambda r: r.to_text().replace('"', "").rstrip("."),
resolver.resolve(domain, record_type, lifetime=timeout),
if record_type == "TXT":
resource_records = list(
map(
lambda r: r.strings,
resolver.resolve(domain, record_type, lifetime=timeout),
)
)
_resource_record = [
resource_record[0][:0].join(resource_record)
for resource_record in resource_records
if resource_record
]
records = [r.decode() for r in _resource_record]
else:
records = list(
map(
lambda r: r.to_text().replace('"', "").rstrip("."),
resolver.resolve(domain, record_type, lifetime=timeout),
)
)
)
if cache:
cache[cache_key] = records
@@ -181,9 +188,9 @@ def get_reverse_dns(
ip_address,
*,
cache: Optional[ExpiringDict] = None,
nameservers: Optional[list[str]] = None,
timeout: float = 2.0,
) -> Optional[str]:
nameservers: list[str] = None,
timeout: int = 2.0,
) -> str:
"""
Resolves an IP address to a hostname using a reverse DNS query
@@ -201,7 +208,7 @@ def get_reverse_dns(
try:
address = dns.reversename.from_address(ip_address)
hostname = query_dns(
str(address), "PTR", cache=cache, nameservers=nameservers, timeout=timeout
address, "PTR", cache=cache, nameservers=nameservers, timeout=timeout
)[0]
except dns.exception.DNSException as e:
@@ -238,7 +245,7 @@ def timestamp_to_human(timestamp: int) -> str:
def human_timestamp_to_datetime(
human_timestamp: str, *, to_utc: bool = False
human_timestamp: str, *, to_utc: Optional[bool] = False
) -> datetime:
"""
Converts a human-readable timestamp into a Python ``datetime`` object
@@ -269,12 +276,10 @@ def human_timestamp_to_unix_timestamp(human_timestamp: str) -> int:
float: The converted timestamp
"""
human_timestamp = human_timestamp.replace("T", " ")
return int(human_timestamp_to_datetime(human_timestamp).timestamp())
return human_timestamp_to_datetime(human_timestamp).timestamp()
def get_ip_address_country(
ip_address: str, *, db_path: Optional[str] = None
) -> Optional[str]:
def get_ip_address_country(ip_address: str, *, db_path: Optional[str] = None) -> str:
"""
Returns the ISO code for the country associated
with the given IPv4 or IPv6 address
@@ -337,14 +342,14 @@ def get_ip_address_country(
def get_service_from_reverse_dns_base_domain(
base_domain,
base_domain: str,
*,
always_use_local_file: bool = False,
always_use_local_file: Optional[bool] = False,
local_file_path: Optional[str] = None,
url: Optional[str] = None,
offline: bool = False,
reverse_dns_map: Optional[ReverseDNSMap] = None,
) -> ReverseDNSService:
offline: Optional[bool] = False,
reverse_dns_map: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""
Returns the service name of a given base domain name from reverse DNS.
@@ -361,6 +366,12 @@ def get_service_from_reverse_dns_base_domain(
the supplied reverse_dns_base_domain and the type will be None
"""
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = dict(name=row["name"], type=row["type"])
base_domain = base_domain.lower().strip()
if url is None:
url = (
@@ -368,24 +379,11 @@ def get_service_from_reverse_dns_base_domain(
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None:
reverse_dns_map_value = {}
else:
reverse_dns_map_value = reverse_dns_map
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
reverse_dns_map = dict()
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
if not (offline or always_use_local_file) and len(reverse_dns_map) == 0:
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
@@ -402,7 +400,7 @@ def get_service_from_reverse_dns_base_domain(
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map_value) == 0:
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
@@ -411,28 +409,27 @@ def get_service_from_reverse_dns_base_domain(
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
service: ReverseDNSService
try:
service = reverse_dns_map_value[base_domain]
service = reverse_dns_map[base_domain]
except KeyError:
service = {"name": base_domain, "type": None}
service = dict(name=base_domain, type=None)
return service
def get_ip_address_info(
ip_address,
ip_address: str,
*,
ip_db_path: Optional[str] = None,
reverse_dns_map_path: Optional[str] = None,
always_use_local_files: bool = False,
always_use_local_files: Optional[bool] = False,
reverse_dns_map_url: Optional[str] = None,
cache: Optional[ExpiringDict] = None,
reverse_dns_map: Optional[ReverseDNSMap] = None,
offline: bool = False,
reverse_dns_map: Optional[dict[str, Any]] = None,
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
timeout: float = 2.0,
) -> IPAddressInfo:
timeout: Optional[float] = 2.0,
) -> dict[str, Any]:
"""
Returns reverse DNS and country information for the given IP address
@@ -455,22 +452,12 @@ def get_ip_address_info(
"""
ip_address = ip_address.lower()
if cache is not None:
cached_info = cache.get(ip_address, None)
if (
cached_info
and isinstance(cached_info, dict)
and "ip_address" in cached_info
):
info = cache.get(ip_address, None)
if info:
logger.debug(f"IP address {ip_address} was found in cache")
return cast(IPAddressInfo, cached_info)
info: IPAddressInfo = {
"ip_address": ip_address,
"reverse_dns": None,
"country": None,
"base_domain": None,
"name": None,
"type": None,
}
return info
info = dict()
info["ip_address"] = ip_address
if offline:
reverse_dns = None
else:
@@ -480,6 +467,9 @@ def get_ip_address_info(
country = get_ip_address_country(ip_address, db_path=ip_db_path)
info["country"] = country
info["reverse_dns"] = reverse_dns
info["base_domain"] = None
info["name"] = None
info["type"] = None
if reverse_dns is not None:
base_domain = get_base_domain(reverse_dns)
if base_domain is not None:
@@ -504,7 +494,7 @@ def get_ip_address_info(
return info
def parse_email_address(original_address: str) -> dict[str, Optional[str]]:
def parse_email_address(original_address: str) -> EmailAddress:
if original_address[0] == "":
display_name = None
else:
@@ -568,7 +558,7 @@ def is_mbox(path: str) -> bool:
return _is_mbox
def is_outlook_msg(content) -> bool:
def is_outlook_msg(content: Union[bytes, Any]) -> bool:
"""
Checks if the given content is an Outlook msg OLE/MSG file
@@ -583,7 +573,7 @@ def is_outlook_msg(content) -> bool:
)
def convert_outlook_msg(msg_bytes: bytes) -> bytes:
def convert_outlook_msg(msg_bytes: bytes) -> str:
"""
Uses the ``msgconvert`` Perl utility to convert an Outlook MS file to
standard RFC 822 format
@@ -592,7 +582,7 @@ def convert_outlook_msg(msg_bytes: bytes) -> bytes:
msg_bytes (bytes): the content of the .msg file
Returns:
A RFC 822 bytes payload
A RFC 822 string
"""
if not is_outlook_msg(msg_bytes):
raise ValueError("The supplied bytes are not an Outlook MSG file")
@@ -601,13 +591,14 @@ def convert_outlook_msg(msg_bytes: bytes) -> bytes:
os.chdir(tmp_dir)
with open("sample.msg", "wb") as msg_file:
msg_file.write(msg_bytes)
rfc822_bytes: bytes
try:
subprocess.check_call(
["msgconvert", "sample.msg"], stdout=null_file, stderr=null_file
)
eml_path = "sample.eml"
with open(eml_path, "rb") as eml_file:
rfc822 = eml_file.read()
rfc822_bytes = eml_file.read()
except FileNotFoundError:
raise EmailParserError(
"Failed to convert Outlook MSG: msgconvert utility not found"
@@ -616,12 +607,12 @@ def convert_outlook_msg(msg_bytes: bytes) -> bytes:
os.chdir(orig_dir)
shutil.rmtree(tmp_dir)
return rfc822
return rfc822_bytes.decode("utf-8", errors="replace")
def parse_email(
data: Union[bytes, str], *, strip_attachment_payloads: bool = False
) -> dict:
data: Union[bytes, str], *, strip_attachment_payloads: Optional[bool] = False
) -> dict[str, Any]:
"""
A simplified email parser
@@ -636,7 +627,8 @@ def parse_email(
if isinstance(data, bytes):
if is_outlook_msg(data):
data = convert_outlook_msg(data)
data = data.decode("utf-8", errors="replace")
else:
data = data.decode("utf-8", errors="replace")
parsed_email = mailparser.parse_from_string(data)
headers = json.loads(parsed_email.headers_json).copy()
parsed_email = json.loads(parsed_email.mail_json).copy()

View File

@@ -48,7 +48,7 @@ dependencies = [
"imapclient>=2.1.0",
"kafka-python-ng>=2.2.2",
"lxml>=4.4.0",
"mailsuite>=1.11.1",
"mailsuite>=1.9.18",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0",

164
tests.py Executable file → Normal file
View File

@@ -3,7 +3,6 @@
from __future__ import absolute_import, print_function, unicode_literals
import json
import os
import unittest
from glob import glob
@@ -157,169 +156,6 @@ class Test(unittest.TestCase):
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
print("Passed!")
def testGoogleSecOpsAggregateReport(self):
"""Test Google SecOps aggregate report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
sample_path = "samples/aggregate/example.net!example.com!1529366400!1529452799.xml"
print("Testing Google SecOps aggregate conversion for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path, always_use_local_files=True)
parsed_report = parsed_file["report"]
events = client.save_aggregate_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "DMARC_AGGREGATE"
assert "metadata" in event_dict
assert "principal" in event_dict
assert "target" in event_dict
assert "security_result" in event_dict
print("Passed!")
def testGoogleSecOpsForensicReport(self):
"""Test Google SecOps forensic report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
# Test without payload
client = GoogleSecOpsClient(include_ruf_payload=False, use_stdout=True)
sample_path = "samples/forensic/dmarc_ruf_report_linkedin.eml"
print("Testing Google SecOps forensic conversion (no payload) for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path)
parsed_report = parsed_file["report"]
events = client.save_forensic_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "DMARC_FORENSIC"
# Verify no payload in additional fields
if "additional" in event_dict and "fields" in event_dict["additional"]:
for field in event_dict["additional"]["fields"]:
assert field["key"] != "message_sample", "Payload should not be included when disabled"
print("Passed!")
# Test with payload
client_with_payload = GoogleSecOpsClient(
include_ruf_payload=True,
ruf_payload_max_bytes=100,
use_stdout=True
)
print("Testing Google SecOps forensic conversion (with payload) for {0}: ".format(sample_path), end="")
events_with_payload = client_with_payload.save_forensic_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events_with_payload) > 0, "Expected at least one event"
# Verify payload is included
for event in events_with_payload:
event_dict = json.loads(event)
# Check if message_sample is in additional fields
has_sample = False
if "additional" in event_dict and "fields" in event_dict["additional"]:
for field in event_dict["additional"]["fields"]:
if field["key"] == "message_sample":
has_sample = True
# Verify truncation: max_bytes (100) + "... [truncated]" suffix (16 chars)
# Allow some margin for the actual payload length
max_expected_length = 100 + len("... [truncated]") + 10
assert len(field["value"]) <= max_expected_length, f"Payload should be truncated, got {len(field['value'])} bytes"
break
assert has_sample, "Payload should be included when enabled"
print("Passed!")
def testGoogleSecOpsConfiguration(self):
"""Test Google SecOps client configuration"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
print("Testing Google SecOps client configuration: ", end="")
# Test stdout configuration
client1 = GoogleSecOpsClient(use_stdout=True)
assert client1.include_ruf_payload is False
assert client1.ruf_payload_max_bytes == 4096
assert client1.static_observer_vendor == "parsedmarc"
assert client1.static_observer_name is None
assert client1.static_environment is None
assert client1.use_stdout is True
# Test custom configuration
client2 = GoogleSecOpsClient(
include_ruf_payload=True,
ruf_payload_max_bytes=8192,
static_observer_name="test-observer",
static_observer_vendor="test-vendor",
static_environment="prod",
use_stdout=True
)
assert client2.include_ruf_payload is True
assert client2.ruf_payload_max_bytes == 8192
assert client2.static_observer_name == "test-observer"
assert client2.static_observer_vendor == "test-vendor"
assert client2.static_environment == "prod"
print("Passed!")
def testGoogleSecOpsSmtpTlsReport(self):
"""Test Google SecOps SMTP TLS report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
sample_path = "samples/smtp_tls/rfc8460.json"
print("Testing Google SecOps SMTP TLS conversion for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path)
parsed_report = parsed_file["report"]
events = client.save_smtp_tls_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "SMTP_TLS_REPORT"
assert "metadata" in event_dict
assert "target" in event_dict
assert "security_result" in event_dict
# Verify failed_session_count is in detection_fields as an integer
found_count = False
for field in event_dict["security_result"][0]["detection_fields"]:
if field["key"] == "smtp_tls.failed_session_count":
assert isinstance(field["value"], int), "failed_session_count should be an integer"
found_count = True
break
assert found_count, "failed_session_count should be in detection_fields"
print("Passed!")
if __name__ == "__main__":
unittest.main(verbosity=2)