mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-02-18 07:26:25 +00:00
Compare commits
4 Commits
copilot/ad
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
221bc332ef | ||
|
|
a2a75f7a81 | ||
|
|
50fcb51577 | ||
|
|
dd9ef90773 |
2
.github/workflows/python-tests.yml
vendored
2
.github/workflows/python-tests.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
|
||||
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
# Changelog
|
||||
|
||||
## 9.0.10
|
||||
|
||||
- Support Python 3.14+
|
||||
|
||||
## 9.0.9
|
||||
|
||||
### Fixes
|
||||
|
||||
@@ -44,7 +44,6 @@ Thanks to all
|
||||
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for
|
||||
use with premade dashboards
|
||||
- Optionally send reports to Apache Kafka
|
||||
- Optionally send reports to Google SecOps (Chronicle) in UDM format via API or stdout
|
||||
|
||||
## Python Compatibility
|
||||
|
||||
@@ -62,4 +61,4 @@ for RHEL or Debian.
|
||||
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
|
||||
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
|
||||
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
|
||||
| 3.14 | ❌ | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
|
||||
| 3.14 | ✅ | Actively maintained |
|
||||
|
||||
@@ -1,494 +0,0 @@
|
||||
# Google SecOps (Chronicle) Output
|
||||
|
||||
`parsedmarc` can output DMARC reports to Google SecOps (Chronicle) in UDM (Unified Data Model) format.
|
||||
|
||||
## Configuration
|
||||
|
||||
To enable Google SecOps output, add a `[google_secops]` section to your configuration file:
|
||||
|
||||
### Primary Method: Chronicle Ingestion API
|
||||
|
||||
The recommended approach is to send events directly to Chronicle via the Ingestion API:
|
||||
|
||||
```ini
|
||||
[general]
|
||||
save_aggregate = True
|
||||
save_forensic = True
|
||||
|
||||
[google_secops]
|
||||
# Required: Path to Google service account JSON credentials file
|
||||
api_credentials_file = /path/to/service-account-credentials.json
|
||||
|
||||
# Required: Chronicle customer ID
|
||||
api_customer_id = your-customer-id-here
|
||||
|
||||
# Optional: Chronicle region (default: us)
|
||||
# Options: us, europe, asia-southeast1, me-central2, australia-southeast1
|
||||
api_region = us
|
||||
|
||||
# Optional: Log type for Chronicle ingestion (default: DMARC)
|
||||
api_log_type = DMARC
|
||||
|
||||
# Optional: Include forensic report message payload (default: False)
|
||||
# For privacy, message bodies are excluded by default
|
||||
include_ruf_payload = False
|
||||
|
||||
# Optional: Maximum bytes of forensic message payload to include (default: 4096)
|
||||
ruf_payload_max_bytes = 4096
|
||||
|
||||
# Optional: Static observer name for telemetry identification
|
||||
static_observer_name = my-parsedmarc-instance
|
||||
|
||||
# Optional: Static observer vendor (default: parsedmarc)
|
||||
static_observer_vendor = parsedmarc
|
||||
|
||||
# Optional: Static environment label (e.g., prod, dev)
|
||||
static_environment = prod
|
||||
```
|
||||
|
||||
### Alternative Method: stdout Output
|
||||
|
||||
If you prefer to use an external log shipper (Fluentd, Logstash, Chronicle forwarder), set `use_stdout = True`:
|
||||
|
||||
```ini
|
||||
[google_secops]
|
||||
# Output to stdout instead of Chronicle API
|
||||
use_stdout = True
|
||||
|
||||
# Other optional configuration options (as above)
|
||||
include_ruf_payload = False
|
||||
ruf_payload_max_bytes = 4096
|
||||
static_observer_name = my-instance
|
||||
static_observer_vendor = parsedmarc
|
||||
static_environment = prod
|
||||
```
|
||||
|
||||
## Output Format
|
||||
|
||||
The Google SecOps output produces newline-delimited JSON (NDJSON) in Chronicle UDM format, which can be ingested into Google SecOps for hunting and dashboarding.
|
||||
|
||||
### Event Types
|
||||
|
||||
1. **DMARC_AGGREGATE**: One event per aggregate report row, preserving count and period information
|
||||
2. **DMARC_FORENSIC**: One event per forensic report
|
||||
3. **SMTP_TLS_REPORT**: One event per SMTP TLS failure detail
|
||||
4. **DMARC_PARSE_ERROR**: Generated when parsing fails (does not crash)
|
||||
|
||||
### UDM Schema
|
||||
|
||||
Each event includes:
|
||||
|
||||
- **metadata**: Event timestamp, type, product name, and vendor
|
||||
- `event_timestamp`: RFC 3339 formatted timestamp
|
||||
- `event_type`: Always "GENERIC_EVENT" for UDM
|
||||
- `product_name`: Always "parsedmarc"
|
||||
- `vendor_name`: Configurable via `static_observer_vendor` (default: "parsedmarc")
|
||||
- `product_deployment_id` (optional): Set via `static_observer_name` config
|
||||
|
||||
- **principal**: Source IP address, location (country), and hostname (reverse DNS)
|
||||
- `ip`: Array containing source IP address (always present)
|
||||
- `location.country_or_region` (optional): ISO country code from IP geolocation
|
||||
- `hostname` (optional): Reverse DNS hostname for source IP
|
||||
|
||||
- **target**: Domain name (from DMARC policy)
|
||||
- `domain.name`: The domain being protected by DMARC
|
||||
|
||||
- **security_result**: Severity level, description, and detection fields for dashboarding
|
||||
- `severity`: Derived severity level (HIGH/MEDIUM/LOW/ERROR)
|
||||
- `description`: Human-readable event description
|
||||
- **detection_fields**: Key DMARC dimensions for filtering and grouping
|
||||
- All dashboard-relevant fields use `dmarc.*` or `smtp_tls.*` prefixes for easy identification
|
||||
- Includes IP enrichment data (service name and type from reverse DNS mapping) for enhanced filtering
|
||||
- See "Detection Fields" section below for complete field listings
|
||||
|
||||
- **additional.fields** (optional): Low-value context fields not typically used for dashboarding
|
||||
- SPF/DKIM authentication details (e.g., `spf_0_domain`, `spf_0_result`)
|
||||
- Forensic report metadata (e.g., `feedback_type`, `message_id`, `authentication_results`)
|
||||
- Base domain, environment tags, optional message samples
|
||||
|
||||
**Design Rationale**: DMARC dimensions are placed in `security_result[].detection_fields` rather than `additional.fields` because Chronicle dashboards, stats searches, and aggregations work best with UDM label arrays. The `additional.fields` is a protobuf Struct intended for opaque context and is not reliably queryable for dashboard operations.
|
||||
|
||||
### Detection Fields
|
||||
|
||||
**Aggregate Report Fields** (`DMARC_AGGREGATE` events):
|
||||
- `dmarc.disposition`: DMARC policy action (none, quarantine, reject)
|
||||
- `dmarc.policy`: Published DMARC policy (none, quarantine, reject)
|
||||
- `dmarc.pass`: Boolean - overall DMARC authentication result
|
||||
- `dmarc.spf_aligned`: Boolean - SPF alignment status
|
||||
- `dmarc.dkim_aligned`: Boolean - DKIM alignment status
|
||||
- `dmarc.header_from`: Header From domain
|
||||
- `dmarc.envelope_from`: Envelope From domain (MAIL FROM)
|
||||
- `dmarc.report_org`: Reporting organization name
|
||||
- `dmarc.report_id`: Unique report identifier
|
||||
- `dmarc.report_begin`: Report period start timestamp
|
||||
- `dmarc.report_end`: Report period end timestamp
|
||||
- `dmarc.row_count`: Number of messages represented by this record
|
||||
- `dmarc.spf_result` (optional): SPF authentication result (pass, fail, etc.)
|
||||
- `dmarc.dkim_result` (optional): DKIM authentication result (pass, fail, etc.)
|
||||
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
|
||||
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
|
||||
|
||||
**Forensic Report Fields** (`DMARC_FORENSIC` events):
|
||||
- `dmarc.auth_failure`: Authentication failure type(s) (dmarc, spf, dkim)
|
||||
- `dmarc.reported_domain`: Domain that failed DMARC authentication
|
||||
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
|
||||
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
|
||||
|
||||
**SMTP TLS Report Fields** (`SMTP_TLS_REPORT` events):
|
||||
- `smtp_tls.policy_domain`: Domain being monitored for TLS policy
|
||||
- `smtp_tls.result_type`: Type of TLS failure (certificate-expired, validation-failure, etc.)
|
||||
- `smtp_tls.failed_session_count`: Number of failed sessions
|
||||
- `smtp_tls.report_org`: Reporting organization name
|
||||
- `smtp_tls.report_begin`: Report period start timestamp
|
||||
- `smtp_tls.report_end`: Report period end timestamp
|
||||
|
||||
### Severity Heuristics
|
||||
|
||||
- **HIGH**: DMARC disposition = reject
|
||||
- **MEDIUM**: DMARC disposition = quarantine with partial SPF/DKIM failures
|
||||
- **LOW**: DMARC disposition = none or pass
|
||||
|
||||
## Example Output
|
||||
|
||||
### Aggregate Report Event
|
||||
|
||||
```json
|
||||
{
|
||||
"event_type": "DMARC_AGGREGATE",
|
||||
"metadata": {
|
||||
"event_timestamp": "2018-06-19T00:00:00+00:00",
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": "parsedmarc"
|
||||
},
|
||||
"principal": {
|
||||
"ip": ["199.230.200.36"],
|
||||
"location": {"country_or_region": "US"}
|
||||
},
|
||||
"target": {
|
||||
"domain": {"name": "example.com"}
|
||||
},
|
||||
"security_result": [{
|
||||
"severity": "LOW",
|
||||
"description": "DMARC fail; SPF=pass; DKIM=pass; SPF not aligned; DKIM not aligned; disposition=none",
|
||||
"detection_fields": [
|
||||
{"key": "dmarc.disposition", "value": "none"},
|
||||
{"key": "dmarc.policy", "value": "none"},
|
||||
{"key": "dmarc.pass", "value": false},
|
||||
{"key": "dmarc.spf_aligned", "value": false},
|
||||
{"key": "dmarc.dkim_aligned", "value": false},
|
||||
{"key": "dmarc.header_from", "value": "example.com"},
|
||||
{"key": "dmarc.envelope_from", "value": "example.com"},
|
||||
{"key": "dmarc.report_org", "value": "example.net"},
|
||||
{"key": "dmarc.report_id", "value": "b043f0e264cf4ea995e93765242f6dfb"},
|
||||
{"key": "dmarc.report_begin", "value": "2018-06-19 00:00:00"},
|
||||
{"key": "dmarc.report_end", "value": "2018-06-19 23:59:59"},
|
||||
{"key": "dmarc.row_count", "value": 1},
|
||||
{"key": "dmarc.spf_result", "value": "pass"},
|
||||
{"key": "dmarc.dkim_result", "value": "pass"},
|
||||
{"key": "dmarc.source_service_name", "value": "Example Mail Service"},
|
||||
{"key": "dmarc.source_service_type", "value": "Email Provider"}
|
||||
]
|
||||
}],
|
||||
"additional": {
|
||||
"fields": [
|
||||
{"key": "spf_0_domain", "value": "example.com"},
|
||||
{"key": "spf_0_result", "value": "pass"},
|
||||
{"key": "dkim_0_domain", "value": "example.com"},
|
||||
{"key": "dkim_0_result", "value": "pass"}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Forensic Report Event
|
||||
|
||||
```json
|
||||
{
|
||||
"event_type": "DMARC_FORENSIC",
|
||||
"metadata": {
|
||||
"event_timestamp": "2019-04-30T02:09:00+00:00",
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": "parsedmarc"
|
||||
},
|
||||
"principal": {
|
||||
"ip": ["10.10.10.10"],
|
||||
"location": {"country_or_region": "US"},
|
||||
"hostname": "mail.example-sender.com"
|
||||
},
|
||||
"target": {
|
||||
"domain": {"name": "example.com"}
|
||||
},
|
||||
"security_result": [{
|
||||
"severity": "MEDIUM",
|
||||
"description": "DMARC forensic report: authentication failure (dmarc)",
|
||||
"detection_fields": [
|
||||
{"key": "dmarc.auth_failure", "value": "dmarc"},
|
||||
{"key": "dmarc.reported_domain", "value": "example.com"},
|
||||
{"key": "dmarc.source_service_name", "value": "Example Mail Provider"},
|
||||
{"key": "dmarc.source_service_type", "value": "Email Provider"}
|
||||
]
|
||||
}],
|
||||
"additional": {
|
||||
"fields": [
|
||||
{"key": "feedback_type", "value": "auth-failure"},
|
||||
{"key": "message_id", "value": "<01010101010101010101010101010101@ABAB01MS0016.someserver.loc>"},
|
||||
{"key": "authentication_results", "value": "dmarc=fail (p=none; dis=none) header.from=example.com"},
|
||||
{"key": "delivery_result", "value": "delivered"}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### SMTP TLS Report Event
|
||||
|
||||
```json
|
||||
{
|
||||
"event_type": "SMTP_TLS_REPORT",
|
||||
"metadata": {
|
||||
"event_timestamp": "2016-04-01T00:00:00+00:00",
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": "parsedmarc"
|
||||
},
|
||||
"target": {
|
||||
"domain": {
|
||||
"name": "company-y.example"
|
||||
}
|
||||
},
|
||||
"security_result": [{
|
||||
"severity": "LOW",
|
||||
"description": "SMTP TLS failure: certificate-expired",
|
||||
"detection_fields": [
|
||||
{"key": "smtp_tls.policy_domain", "value": "company-y.example"},
|
||||
{"key": "smtp_tls.result_type", "value": "certificate-expired"},
|
||||
{"key": "smtp_tls.failed_session_count", "value": 100},
|
||||
{"key": "smtp_tls.report_org", "value": "Company-X"},
|
||||
{"key": "smtp_tls.report_begin", "value": "2016-04-01T00:00:00Z"},
|
||||
{"key": "smtp_tls.report_end", "value": "2016-04-01T23:59:59Z"}
|
||||
]
|
||||
}],
|
||||
"principal": {
|
||||
"ip": ["2001:db8:abcd:0012::1"]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Parse Error Event
|
||||
|
||||
```json
|
||||
{
|
||||
"event_type": "DMARC_PARSE_ERROR",
|
||||
"metadata": {
|
||||
"event_timestamp": "2026-01-09T16:22:10.933751+00:00",
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": "parsedmarc"
|
||||
},
|
||||
"security_result": [{
|
||||
"severity": "ERROR",
|
||||
"description": "Failed to parse DMARC report: Invalid XML structure"
|
||||
}]
|
||||
}
|
||||
```
|
||||
|
||||
## Google SecOps Searches
|
||||
|
||||
Here are some example YARA-L rules you can use in Google SecOps to hunt for DMARC issues:
|
||||
|
||||
### Find all DMARC aggregate report failures
|
||||
|
||||
```yara-l
|
||||
rule dmarc_aggregate_failures {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect DMARC authentication failures in aggregate reports"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_AGGREGATE"
|
||||
$e.security_result.detection_fields.key = "dmarc.pass"
|
||||
$e.security_result.detection_fields.value = false
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
### Find high severity DMARC events (rejected mail)
|
||||
|
||||
```yara-l
|
||||
rule high_severity_dmarc_events {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect high severity DMARC aggregate events (rejected mail)"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_AGGREGATE"
|
||||
$e.security_result.severity = "HIGH"
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
### Find repeated DMARC failures from same source IP
|
||||
|
||||
```yara-l
|
||||
rule repeated_dmarc_failures {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect repeated DMARC failures from the same source IP"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_AGGREGATE"
|
||||
$e.security_result.detection_fields.key = "dmarc.pass"
|
||||
$e.security_result.detection_fields.value = false
|
||||
$e.principal.ip = $source_ip
|
||||
|
||||
match:
|
||||
$source_ip over 1h
|
||||
|
||||
condition:
|
||||
#e > 5
|
||||
}
|
||||
```
|
||||
|
||||
### Find DMARC forensic reports with authentication failures
|
||||
|
||||
```yara-l
|
||||
rule dmarc_forensic_failures {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect DMARC forensic reports with authentication failures"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_FORENSIC"
|
||||
$e.security_result.detection_fields.key = "dmarc.auth_failure"
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
### Find DMARC failures from specific mail service types
|
||||
|
||||
```yara-l
|
||||
rule dmarc_failures_by_service_type {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect DMARC failures from specific mail service types"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_AGGREGATE"
|
||||
$e.security_result.detection_fields.key = "dmarc.pass"
|
||||
$e.security_result.detection_fields.value = false
|
||||
$e.security_result.detection_fields.key = "dmarc.source_service_type"
|
||||
$e.security_result.detection_fields.value = "Email Provider"
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
### Find SMTP TLS failures
|
||||
|
||||
```yara-l
|
||||
rule smtp_tls_failures {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect SMTP TLS failures"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "SMTP_TLS_REPORT"
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
## Privacy Considerations
|
||||
|
||||
By default, forensic report message bodies are **excluded** from the output to protect privacy. If you need to include message samples for investigation:
|
||||
|
||||
1. Set `include_ruf_payload = True` in your configuration
|
||||
2. Adjust `ruf_payload_max_bytes` to limit the amount of data included (default: 4096 bytes)
|
||||
3. Message samples will be truncated if they exceed the configured maximum
|
||||
|
||||
**Note**: Be aware of data privacy regulations (GDPR, CCPA, etc.) when including message payloads in security telemetry.
|
||||
|
||||
## Usage
|
||||
|
||||
The Google SecOps output works with all parsedmarc input methods, including file processing and mailbox monitoring.
|
||||
|
||||
### Primary Method: Direct API Ingestion
|
||||
|
||||
With Chronicle Ingestion API configured, events are sent directly to Chronicle:
|
||||
|
||||
```bash
|
||||
# Process files - events are sent to Chronicle API automatically
|
||||
parsedmarc -c config.ini samples/aggregate/*.xml
|
||||
|
||||
# Monitor mailbox - events are sent to Chronicle API in real-time
|
||||
parsedmarc -c config.ini
|
||||
```
|
||||
|
||||
No additional log shippers or pipelines are needed. The Google SecOps client handles authentication and batching automatically.
|
||||
|
||||
### Alternative Method: stdout Output with Log Shipper
|
||||
|
||||
If using `use_stdout = True` in your configuration, output DMARC reports to an external log shipper:
|
||||
|
||||
#### Processing Files
|
||||
|
||||
```bash
|
||||
# Output to stdout
|
||||
parsedmarc -c config.ini samples/aggregate/*.xml > dmarc_events.ndjson
|
||||
|
||||
# Stream to file
|
||||
parsedmarc -c config.ini samples/aggregate/*.xml >> /var/log/dmarc/events.ndjson
|
||||
|
||||
# Pipe to log shipper (e.g., Fluentd, Logstash, Chronicle forwarder)
|
||||
parsedmarc -c config.ini samples/aggregate/*.xml | fluentd
|
||||
```
|
||||
|
||||
#### Monitoring Mailboxes
|
||||
|
||||
The Google SecOps output automatically works when monitoring mailboxes via IMAP, Microsoft Graph, or Gmail API. Configure your mailbox connection and enable watching:
|
||||
|
||||
```ini
|
||||
[general]
|
||||
save_aggregate = True
|
||||
save_forensic = True
|
||||
|
||||
[mailbox]
|
||||
watch = True
|
||||
delete = False
|
||||
batch_size = 10
|
||||
|
||||
[imap]
|
||||
host = imap.example.com
|
||||
user = dmarc@example.com
|
||||
password = yourpassword
|
||||
|
||||
[google_secops]
|
||||
# Use stdout mode for log shipper integration
|
||||
use_stdout = True
|
||||
include_ruf_payload = False
|
||||
static_observer_name = mailbox-monitor
|
||||
static_environment = prod
|
||||
```
|
||||
|
||||
When watching a mailbox with stdout mode, parsedmarc continuously outputs UDM events as new reports arrive:
|
||||
|
||||
```bash
|
||||
parsedmarc -c config.ini | fluentd
|
||||
```
|
||||
|
||||
The output is in newline-delimited JSON format, with one UDM event per line, ready for collection by your log shipper.
|
||||
@@ -44,7 +44,6 @@ and Valimail.
|
||||
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for use
|
||||
with premade dashboards
|
||||
- Optionally send reports to Apache Kafka
|
||||
- Optionally send reports to Google SecOps (Chronicle) in UDM format
|
||||
|
||||
## Python Compatibility
|
||||
|
||||
@@ -62,7 +61,7 @@ for RHEL or Debian.
|
||||
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
|
||||
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
|
||||
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
|
||||
| 3.14 | ❌ | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
|
||||
| 3.14 | ✅ | Actively maintained |
|
||||
|
||||
```{toctree}
|
||||
:caption: 'Contents'
|
||||
@@ -75,7 +74,6 @@ elasticsearch
|
||||
opensearch
|
||||
kibana
|
||||
splunk
|
||||
google_secops
|
||||
davmail
|
||||
dmarc
|
||||
contributing
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -751,8 +751,8 @@ def parse_aggregate_report_xml(
|
||||
new_report_metadata["report_id"] = report_id
|
||||
date_range = report["report_metadata"]["date_range"]
|
||||
|
||||
begin_ts = int(date_range["begin"])
|
||||
end_ts = int(date_range["end"])
|
||||
begin_ts = int(date_range["begin"].split(".")[0])
|
||||
end_ts = int(date_range["end"].split(".")[0])
|
||||
span_seconds = end_ts - begin_ts
|
||||
|
||||
normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600
|
||||
|
||||
@@ -27,7 +27,6 @@ from parsedmarc import (
|
||||
gelf,
|
||||
get_dmarc_reports_from_mailbox,
|
||||
get_dmarc_reports_from_mbox,
|
||||
google_secops,
|
||||
kafkaclient,
|
||||
loganalytics,
|
||||
opensearch,
|
||||
@@ -285,14 +284,6 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("GELF Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.google_secops:
|
||||
events = google_secops_client.save_aggregate_report_to_google_secops(report)
|
||||
for event in events:
|
||||
print(event)
|
||||
except Exception as error_:
|
||||
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.webhook_aggregate_url:
|
||||
indent_value = 2 if opts.prettify_json else None
|
||||
@@ -378,14 +369,6 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("GELF Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.google_secops:
|
||||
events = google_secops_client.save_forensic_report_to_google_secops(report)
|
||||
for event in events:
|
||||
print(event)
|
||||
except Exception as error_:
|
||||
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.webhook_forensic_url:
|
||||
indent_value = 2 if opts.prettify_json else None
|
||||
@@ -471,14 +454,6 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("GELF Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.google_secops:
|
||||
events = google_secops_client.save_smtp_tls_report_to_google_secops(report)
|
||||
for event in events:
|
||||
print(event)
|
||||
except Exception as error_:
|
||||
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.webhook_smtp_tls_url:
|
||||
indent_value = 2 if opts.prettify_json else None
|
||||
@@ -747,17 +722,6 @@ def _main():
|
||||
gelf_host=None,
|
||||
gelf_port=None,
|
||||
gelf_mode=None,
|
||||
google_secops=False,
|
||||
google_secops_include_ruf_payload=False,
|
||||
google_secops_ruf_payload_max_bytes=4096,
|
||||
google_secops_static_observer_name=None,
|
||||
google_secops_static_observer_vendor="parsedmarc",
|
||||
google_secops_static_environment=None,
|
||||
google_secops_api_credentials_file=None,
|
||||
google_secops_api_customer_id=None,
|
||||
google_secops_api_region="us",
|
||||
google_secops_api_log_type="DMARC",
|
||||
google_secops_use_stdout=False,
|
||||
webhook_aggregate_url=None,
|
||||
webhook_forensic_url=None,
|
||||
webhook_smtp_tls_url=None,
|
||||
@@ -1337,50 +1301,6 @@ def _main():
|
||||
logger.critical("mode setting missing from the gelf config section")
|
||||
exit(-1)
|
||||
|
||||
if "google_secops" in config.sections():
|
||||
google_secops_config = config["google_secops"]
|
||||
opts.google_secops = True
|
||||
if "include_ruf_payload" in google_secops_config:
|
||||
opts.google_secops_include_ruf_payload = bool(
|
||||
google_secops_config.getboolean("include_ruf_payload")
|
||||
)
|
||||
if "ruf_payload_max_bytes" in google_secops_config:
|
||||
opts.google_secops_ruf_payload_max_bytes = google_secops_config.getint(
|
||||
"ruf_payload_max_bytes"
|
||||
)
|
||||
if "static_observer_name" in google_secops_config:
|
||||
opts.google_secops_static_observer_name = google_secops_config[
|
||||
"static_observer_name"
|
||||
]
|
||||
if "static_observer_vendor" in google_secops_config:
|
||||
opts.google_secops_static_observer_vendor = google_secops_config[
|
||||
"static_observer_vendor"
|
||||
]
|
||||
if "static_environment" in google_secops_config:
|
||||
opts.google_secops_static_environment = google_secops_config[
|
||||
"static_environment"
|
||||
]
|
||||
if "api_credentials_file" in google_secops_config:
|
||||
opts.google_secops_api_credentials_file = google_secops_config[
|
||||
"api_credentials_file"
|
||||
]
|
||||
if "api_customer_id" in google_secops_config:
|
||||
opts.google_secops_api_customer_id = google_secops_config[
|
||||
"api_customer_id"
|
||||
]
|
||||
if "api_region" in google_secops_config:
|
||||
opts.google_secops_api_region = google_secops_config[
|
||||
"api_region"
|
||||
]
|
||||
if "api_log_type" in google_secops_config:
|
||||
opts.google_secops_api_log_type = google_secops_config[
|
||||
"api_log_type"
|
||||
]
|
||||
if "use_stdout" in google_secops_config:
|
||||
opts.google_secops_use_stdout = google_secops_config.getboolean(
|
||||
"use_stdout"
|
||||
)
|
||||
|
||||
if "webhook" in config.sections():
|
||||
webhook_config = config["webhook"]
|
||||
if "aggregate_url" in webhook_config:
|
||||
@@ -1559,23 +1479,6 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("GELF Error: {0}".format(error_.__str__()))
|
||||
|
||||
if opts.google_secops:
|
||||
try:
|
||||
google_secops_client = google_secops.GoogleSecOpsClient(
|
||||
include_ruf_payload=opts.google_secops_include_ruf_payload,
|
||||
ruf_payload_max_bytes=opts.google_secops_ruf_payload_max_bytes,
|
||||
static_observer_name=opts.google_secops_static_observer_name,
|
||||
static_observer_vendor=opts.google_secops_static_observer_vendor,
|
||||
static_environment=opts.google_secops_static_environment,
|
||||
api_credentials_file=opts.google_secops_api_credentials_file,
|
||||
api_customer_id=opts.google_secops_api_customer_id,
|
||||
api_region=opts.google_secops_api_region,
|
||||
api_log_type=opts.google_secops_api_log_type,
|
||||
use_stdout=opts.google_secops_use_stdout,
|
||||
)
|
||||
except Exception as error_:
|
||||
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
|
||||
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.0.9"
|
||||
__version__ = "9.0.10"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -1,709 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Google SecOps (Chronicle) output module for parsedmarc"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
import requests
|
||||
from google.auth.transport.requests import Request
|
||||
from google.oauth2 import service_account
|
||||
|
||||
from parsedmarc.constants import USER_AGENT
|
||||
from parsedmarc.log import logger
|
||||
from parsedmarc.utils import human_timestamp_to_datetime
|
||||
|
||||
|
||||
class GoogleSecOpsError(RuntimeError):
|
||||
"""Raised when a Google SecOps API error occurs"""
|
||||
|
||||
|
||||
class GoogleSecOpsClient:
|
||||
"""A client for Google SecOps (Chronicle) UDM output"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
include_ruf_payload: bool = False,
|
||||
ruf_payload_max_bytes: int = 4096,
|
||||
static_observer_name: Optional[str] = None,
|
||||
static_observer_vendor: str = "parsedmarc",
|
||||
static_environment: Optional[str] = None,
|
||||
api_credentials_file: Optional[str] = None,
|
||||
api_customer_id: Optional[str] = None,
|
||||
api_region: str = "us",
|
||||
api_log_type: str = "DMARC",
|
||||
use_stdout: bool = False,
|
||||
):
|
||||
"""
|
||||
Initializes the GoogleSecOpsClient
|
||||
|
||||
Args:
|
||||
include_ruf_payload: Include RUF message payload in output
|
||||
ruf_payload_max_bytes: Maximum bytes of RUF payload to include
|
||||
static_observer_name: Static observer name for telemetry
|
||||
static_observer_vendor: Static observer vendor (default: parsedmarc)
|
||||
static_environment: Static environment (prod/dev/custom string)
|
||||
api_credentials_file: Path to Google service account JSON credentials
|
||||
api_customer_id: Chronicle customer ID (required for API)
|
||||
api_region: Chronicle region (us, europe, asia-southeast1, etc.)
|
||||
api_log_type: Log type for Chronicle ingestion (default: DMARC)
|
||||
use_stdout: Output to stdout instead of API (default: False)
|
||||
"""
|
||||
self.include_ruf_payload = include_ruf_payload
|
||||
self.ruf_payload_max_bytes = ruf_payload_max_bytes
|
||||
self.static_observer_name = static_observer_name
|
||||
self.static_observer_vendor = static_observer_vendor
|
||||
self.static_environment = static_environment
|
||||
self.use_stdout = use_stdout
|
||||
|
||||
# API configuration
|
||||
self.api_credentials_file = api_credentials_file
|
||||
self.api_customer_id = api_customer_id
|
||||
self.api_region = api_region
|
||||
self.api_log_type = api_log_type
|
||||
self.credentials = None
|
||||
self.session = None
|
||||
|
||||
# Initialize API client if not using stdout
|
||||
if not self.use_stdout:
|
||||
if not self.api_credentials_file or not self.api_customer_id:
|
||||
raise GoogleSecOpsError(
|
||||
"api_credentials_file and api_customer_id are required when not using stdout. "
|
||||
"Set use_stdout=True to output to stdout instead."
|
||||
)
|
||||
self._initialize_api_client()
|
||||
|
||||
def _initialize_api_client(self):
|
||||
"""Initialize the Chronicle API client with authentication"""
|
||||
try:
|
||||
logger.debug("Initializing Chronicle API client")
|
||||
|
||||
# Load service account credentials
|
||||
self.credentials = service_account.Credentials.from_service_account_file(
|
||||
self.api_credentials_file,
|
||||
scopes=["https://www.googleapis.com/auth/cloud-platform"],
|
||||
)
|
||||
|
||||
# Create session with authentication
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({"User-Agent": USER_AGENT})
|
||||
|
||||
logger.info("Chronicle API client initialized successfully")
|
||||
except Exception as e:
|
||||
raise GoogleSecOpsError(f"Failed to initialize Chronicle API client: {e}")
|
||||
|
||||
def _get_api_endpoint(self) -> str:
|
||||
"""Get the Chronicle Ingestion API endpoint based on region"""
|
||||
return f"https://{self.api_region}-chronicle.googleapis.com/v1alpha/projects/{self.api_customer_id}/locations/{self.api_region}/instances/default/logTypes/{self.api_log_type}/logs:import"
|
||||
|
||||
def _send_events_to_api(self, events: list[str]) -> None:
|
||||
"""
|
||||
Send UDM events to Chronicle Ingestion API
|
||||
|
||||
Args:
|
||||
events: List of NDJSON event strings
|
||||
"""
|
||||
if not events:
|
||||
return
|
||||
|
||||
try:
|
||||
# Refresh credentials if needed
|
||||
if not self.credentials.valid:
|
||||
self.credentials.refresh(Request())
|
||||
|
||||
# Prepare request
|
||||
endpoint = self._get_api_endpoint()
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.credentials.token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
# Chronicle expects events in inline_source format
|
||||
# Each event should be a separate log entry
|
||||
payload = {
|
||||
"inline_source": {
|
||||
"logs": [json.loads(event) for event in events]
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug(f"Sending {len(events)} events to Chronicle API")
|
||||
|
||||
response = self.session.post(
|
||||
endpoint,
|
||||
headers=headers,
|
||||
json=payload,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Successfully sent {len(events)} events to Chronicle")
|
||||
else:
|
||||
error_msg = f"Chronicle API error: {response.status_code} - {response.text}"
|
||||
logger.error(error_msg)
|
||||
raise GoogleSecOpsError(error_msg)
|
||||
|
||||
except GoogleSecOpsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise GoogleSecOpsError(f"Failed to send events to Chronicle API: {e}")
|
||||
|
||||
def _output_events(self, events: list[str]) -> None:
|
||||
"""
|
||||
Output events either to stdout or Chronicle API
|
||||
|
||||
Args:
|
||||
events: List of NDJSON event strings
|
||||
"""
|
||||
if self.use_stdout:
|
||||
# Output to stdout for collection by external log shippers
|
||||
for event in events:
|
||||
print(event)
|
||||
else:
|
||||
# Send directly to Chronicle API
|
||||
self._send_events_to_api(events)
|
||||
|
||||
def _get_severity(self, disposition: str, spf_aligned: bool, dkim_aligned: bool) -> str:
|
||||
"""
|
||||
Derive severity from DMARC disposition and alignment
|
||||
|
||||
Args:
|
||||
disposition: DMARC policy disposition
|
||||
spf_aligned: Whether SPF is aligned
|
||||
dkim_aligned: Whether DKIM is aligned
|
||||
|
||||
Returns:
|
||||
Severity level: HIGH, MEDIUM, or LOW
|
||||
"""
|
||||
if disposition == "reject":
|
||||
return "HIGH"
|
||||
elif disposition == "quarantine" and not (spf_aligned or dkim_aligned):
|
||||
return "MEDIUM"
|
||||
else:
|
||||
return "LOW"
|
||||
|
||||
def _get_description(
|
||||
self,
|
||||
dmarc_pass: bool,
|
||||
spf_result: Optional[str],
|
||||
dkim_result: Optional[str],
|
||||
spf_aligned: bool,
|
||||
dkim_aligned: bool,
|
||||
disposition: str,
|
||||
) -> str:
|
||||
"""
|
||||
Generate description for the event
|
||||
|
||||
Args:
|
||||
dmarc_pass: Whether DMARC passed
|
||||
spf_result: SPF result
|
||||
dkim_result: DKIM result
|
||||
spf_aligned: Whether SPF is aligned
|
||||
dkim_aligned: Whether DKIM is aligned
|
||||
disposition: DMARC disposition
|
||||
|
||||
Returns:
|
||||
Human-readable description
|
||||
"""
|
||||
parts = []
|
||||
|
||||
if dmarc_pass:
|
||||
parts.append("DMARC pass")
|
||||
else:
|
||||
parts.append("DMARC fail")
|
||||
|
||||
if spf_result:
|
||||
parts.append(f"SPF={spf_result}")
|
||||
if dkim_result:
|
||||
parts.append(f"DKIM={dkim_result}")
|
||||
|
||||
if spf_aligned:
|
||||
parts.append("SPF aligned")
|
||||
elif spf_result:
|
||||
parts.append("SPF not aligned")
|
||||
|
||||
if dkim_aligned:
|
||||
parts.append("DKIM aligned")
|
||||
elif dkim_result:
|
||||
parts.append("DKIM not aligned")
|
||||
|
||||
parts.append(f"disposition={disposition}")
|
||||
|
||||
return "; ".join(parts)
|
||||
|
||||
def _format_timestamp(self, timestamp_str: str) -> str:
|
||||
"""
|
||||
Convert parsedmarc timestamp to RFC 3339 format
|
||||
|
||||
Args:
|
||||
timestamp_str: Timestamp string from parsedmarc
|
||||
|
||||
Returns:
|
||||
RFC 3339 formatted timestamp
|
||||
"""
|
||||
try:
|
||||
dt = human_timestamp_to_datetime(timestamp_str)
|
||||
# Ensure timezone-aware datetime
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt.isoformat()
|
||||
except Exception:
|
||||
# Fallback to current time if parsing fails
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
def save_aggregate_report_to_google_secops(
|
||||
self, aggregate_report: dict[str, Any]
|
||||
) -> list[str]:
|
||||
"""
|
||||
Convert aggregate DMARC report to Google SecOps UDM format and send to Chronicle
|
||||
|
||||
When use_stdout=False: Events are sent to Chronicle API, returns empty list
|
||||
When use_stdout=True: Returns list of NDJSON event strings for stdout
|
||||
|
||||
Args:
|
||||
aggregate_report: Aggregate report dictionary from parsedmarc
|
||||
|
||||
Returns:
|
||||
List of NDJSON event strings (empty if sent to API)
|
||||
"""
|
||||
logger.debug("Converting aggregate report to Google SecOps UDM format")
|
||||
events = []
|
||||
|
||||
try:
|
||||
report_metadata = aggregate_report["report_metadata"]
|
||||
policy_published = aggregate_report["policy_published"]
|
||||
|
||||
for record in aggregate_report["records"]:
|
||||
# Extract values
|
||||
source_ip = record["source"]["ip_address"]
|
||||
source_country = record["source"].get("country")
|
||||
source_reverse_dns = record["source"].get("reverse_dns")
|
||||
source_base_domain = record["source"].get("base_domain")
|
||||
source_name = record["source"].get("name")
|
||||
source_type = record["source"].get("type")
|
||||
|
||||
header_from = record["identifiers"]["header_from"]
|
||||
envelope_from = record["identifiers"]["envelope_from"]
|
||||
|
||||
disposition = record["policy_evaluated"]["disposition"]
|
||||
spf_aligned = record["alignment"]["spf"]
|
||||
dkim_aligned = record["alignment"]["dkim"]
|
||||
dmarc_pass = record["alignment"]["dmarc"]
|
||||
|
||||
count = record["count"]
|
||||
interval_begin = record["interval_begin"]
|
||||
|
||||
# Get auth results
|
||||
spf_results = record["auth_results"].get("spf", [])
|
||||
dkim_results = record["auth_results"].get("dkim", [])
|
||||
|
||||
spf_result = spf_results[0]["result"] if spf_results else None
|
||||
dkim_result = dkim_results[0]["result"] if dkim_results else None
|
||||
|
||||
# Build UDM event
|
||||
event: dict[str, Any] = {
|
||||
"event_type": "DMARC_AGGREGATE",
|
||||
"metadata": {
|
||||
"event_timestamp": self._format_timestamp(interval_begin),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"principal": {
|
||||
"ip": [source_ip],
|
||||
},
|
||||
"target": {
|
||||
"domain": {
|
||||
"name": header_from,
|
||||
}
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": self._get_severity(
|
||||
disposition, spf_aligned, dkim_aligned
|
||||
),
|
||||
"description": self._get_description(
|
||||
dmarc_pass,
|
||||
spf_result,
|
||||
dkim_result,
|
||||
spf_aligned,
|
||||
dkim_aligned,
|
||||
disposition,
|
||||
),
|
||||
"detection_fields": [
|
||||
{"key": "dmarc.disposition", "value": disposition},
|
||||
{"key": "dmarc.policy", "value": policy_published["p"]},
|
||||
{"key": "dmarc.pass", "value": dmarc_pass},
|
||||
{"key": "dmarc.spf_aligned", "value": spf_aligned},
|
||||
{"key": "dmarc.dkim_aligned", "value": dkim_aligned},
|
||||
{"key": "dmarc.header_from", "value": header_from},
|
||||
{"key": "dmarc.envelope_from", "value": envelope_from},
|
||||
{"key": "dmarc.report_org", "value": report_metadata["org_name"]},
|
||||
{"key": "dmarc.report_id", "value": report_metadata["report_id"]},
|
||||
{"key": "dmarc.report_begin", "value": report_metadata["begin_date"]},
|
||||
{"key": "dmarc.report_end", "value": report_metadata["end_date"]},
|
||||
{"key": "dmarc.row_count", "value": count},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Add optional fields to detection_fields
|
||||
if spf_result:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.spf_result", "value": spf_result}
|
||||
)
|
||||
if dkim_result:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.dkim_result", "value": dkim_result}
|
||||
)
|
||||
if source_name:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.source_service_name", "value": source_name}
|
||||
)
|
||||
if source_type:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.source_service_type", "value": source_type}
|
||||
)
|
||||
|
||||
# Add optional context fields (low-value, not for dashboarding)
|
||||
additional_context = []
|
||||
|
||||
if source_base_domain:
|
||||
additional_context.append(
|
||||
{"key": "source_base_domain", "value": source_base_domain}
|
||||
)
|
||||
|
||||
if self.static_environment:
|
||||
additional_context.append(
|
||||
{"key": "environment", "value": self.static_environment}
|
||||
)
|
||||
|
||||
# Add SPF auth results to context
|
||||
if spf_results:
|
||||
for idx, spf in enumerate(spf_results):
|
||||
additional_context.append(
|
||||
{"key": f"spf_{idx}_domain", "value": spf.get("domain", "")}
|
||||
)
|
||||
additional_context.append(
|
||||
{"key": f"spf_{idx}_result", "value": spf.get("result", "")}
|
||||
)
|
||||
|
||||
# Add DKIM auth results to context
|
||||
if dkim_results:
|
||||
for idx, dkim in enumerate(dkim_results):
|
||||
additional_context.append(
|
||||
{"key": f"dkim_{idx}_domain", "value": dkim.get("domain", "")}
|
||||
)
|
||||
additional_context.append(
|
||||
{"key": f"dkim_{idx}_result", "value": dkim.get("result", "")}
|
||||
)
|
||||
|
||||
# Only add additional section if there's context to include
|
||||
if additional_context:
|
||||
event["additional"] = {"fields": additional_context}
|
||||
|
||||
# Add optional UDM fields
|
||||
if source_country:
|
||||
event["principal"]["location"] = {"country_or_region": source_country}
|
||||
|
||||
if source_reverse_dns:
|
||||
event["principal"]["hostname"] = source_reverse_dns
|
||||
|
||||
if self.static_observer_name:
|
||||
event["metadata"]["product_deployment_id"] = self.static_observer_name
|
||||
|
||||
events.append(json.dumps(event, ensure_ascii=False))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting aggregate report to Google SecOps format: {e}")
|
||||
# Generate error event
|
||||
error_event: dict[str, Any] = {
|
||||
"event_type": "DMARC_PARSE_ERROR",
|
||||
"metadata": {
|
||||
"event_timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": "ERROR",
|
||||
"description": f"Failed to parse DMARC aggregate report: {str(e)}",
|
||||
}
|
||||
],
|
||||
}
|
||||
events.append(json.dumps(error_event, ensure_ascii=False))
|
||||
|
||||
# Output events (to stdout or API)
|
||||
self._output_events(events)
|
||||
|
||||
# Return events only if using stdout (for CLI to print)
|
||||
return events if self.use_stdout else []
|
||||
|
||||
def save_forensic_report_to_google_secops(
|
||||
self, forensic_report: dict[str, Any]
|
||||
) -> list[str]:
|
||||
"""
|
||||
Convert forensic DMARC report to Google SecOps UDM format and send to Chronicle
|
||||
|
||||
When use_stdout=False: Events are sent to Chronicle API, returns empty list
|
||||
When use_stdout=True: Returns list of NDJSON event strings for stdout
|
||||
|
||||
Args:
|
||||
forensic_report: Forensic report dictionary from parsedmarc
|
||||
|
||||
Returns:
|
||||
List of NDJSON event strings (empty if sent to API)
|
||||
"""
|
||||
logger.debug("Converting forensic report to Google SecOps UDM format")
|
||||
events = []
|
||||
|
||||
try:
|
||||
source_ip = forensic_report["source"]["ip_address"]
|
||||
source_country = forensic_report["source"].get("country")
|
||||
source_reverse_dns = forensic_report["source"].get("reverse_dns")
|
||||
source_base_domain = forensic_report["source"].get("base_domain")
|
||||
source_name = forensic_report["source"].get("name")
|
||||
source_type = forensic_report["source"].get("type")
|
||||
|
||||
reported_domain = forensic_report["reported_domain"]
|
||||
arrival_date = forensic_report["arrival_date_utc"]
|
||||
auth_failure = forensic_report.get("auth_failure", [])
|
||||
|
||||
# Determine severity - forensic reports indicate failures
|
||||
# Default to MEDIUM for authentication failures
|
||||
severity = "MEDIUM"
|
||||
|
||||
# Build description
|
||||
auth_failure_str = ", ".join(auth_failure) if auth_failure else "unknown"
|
||||
description = f"DMARC forensic report: authentication failure ({auth_failure_str})"
|
||||
|
||||
# Build UDM event
|
||||
event: dict[str, Any] = {
|
||||
"event_type": "DMARC_FORENSIC",
|
||||
"metadata": {
|
||||
"event_timestamp": self._format_timestamp(arrival_date),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"principal": {
|
||||
"ip": [source_ip],
|
||||
},
|
||||
"target": {
|
||||
"domain": {
|
||||
"name": reported_domain,
|
||||
}
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": severity,
|
||||
"description": description,
|
||||
"detection_fields": [
|
||||
{"key": "dmarc.auth_failure", "value": auth_failure_str},
|
||||
{"key": "dmarc.reported_domain", "value": reported_domain},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Add optional fields to detection_fields (matching aggregate report field names)
|
||||
if source_name:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.source_service_name", "value": source_name}
|
||||
)
|
||||
if source_type:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.source_service_type", "value": source_type}
|
||||
)
|
||||
|
||||
# Add optional context fields (low-value, not for dashboarding)
|
||||
additional_context = []
|
||||
|
||||
if source_base_domain:
|
||||
additional_context.append(
|
||||
{"key": "source_base_domain", "value": source_base_domain}
|
||||
)
|
||||
|
||||
if forensic_report.get("feedback_type"):
|
||||
additional_context.append(
|
||||
{"key": "feedback_type", "value": forensic_report["feedback_type"]}
|
||||
)
|
||||
|
||||
if forensic_report.get("message_id"):
|
||||
additional_context.append(
|
||||
{"key": "message_id", "value": forensic_report["message_id"]}
|
||||
)
|
||||
|
||||
if forensic_report.get("authentication_results"):
|
||||
additional_context.append(
|
||||
{"key": "authentication_results", "value": forensic_report["authentication_results"]}
|
||||
)
|
||||
|
||||
if forensic_report.get("delivery_result"):
|
||||
additional_context.append(
|
||||
{"key": "delivery_result", "value": forensic_report["delivery_result"]}
|
||||
)
|
||||
|
||||
if self.static_environment:
|
||||
additional_context.append(
|
||||
{"key": "environment", "value": self.static_environment}
|
||||
)
|
||||
|
||||
# Add payload excerpt if enabled
|
||||
if self.include_ruf_payload and forensic_report.get("sample"):
|
||||
sample = forensic_report["sample"]
|
||||
if len(sample) > self.ruf_payload_max_bytes:
|
||||
sample = sample[:self.ruf_payload_max_bytes] + "... [truncated]"
|
||||
additional_context.append(
|
||||
{"key": "message_sample", "value": sample}
|
||||
)
|
||||
|
||||
# Only add additional section if there's context to include
|
||||
if additional_context:
|
||||
event["additional"] = {"fields": additional_context}
|
||||
|
||||
# Add optional UDM fields
|
||||
if source_country:
|
||||
event["principal"]["location"] = {"country_or_region": source_country}
|
||||
|
||||
if source_reverse_dns:
|
||||
event["principal"]["hostname"] = source_reverse_dns
|
||||
|
||||
if self.static_observer_name:
|
||||
event["metadata"]["product_deployment_id"] = self.static_observer_name
|
||||
|
||||
events.append(json.dumps(event, ensure_ascii=False))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting forensic report to Google SecOps format: {e}")
|
||||
# Generate error event
|
||||
error_event: dict[str, Any] = {
|
||||
"event_type": "DMARC_PARSE_ERROR",
|
||||
"metadata": {
|
||||
"event_timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": "ERROR",
|
||||
"description": f"Failed to parse DMARC forensic report: {str(e)}",
|
||||
}
|
||||
],
|
||||
}
|
||||
events.append(json.dumps(error_event, ensure_ascii=False))
|
||||
|
||||
# Output events (to stdout or API)
|
||||
self._output_events(events)
|
||||
|
||||
# Return events only if using stdout (for CLI to print)
|
||||
return events if self.use_stdout else []
|
||||
|
||||
def save_smtp_tls_report_to_google_secops(
|
||||
self, smtp_tls_report: dict[str, Any]
|
||||
) -> list[str]:
|
||||
"""
|
||||
Convert SMTP TLS report to Google SecOps UDM format and send to Chronicle
|
||||
|
||||
When use_stdout=False: Events are sent to Chronicle API, returns empty list
|
||||
When use_stdout=True: Returns list of NDJSON event strings for stdout
|
||||
|
||||
Args:
|
||||
smtp_tls_report: SMTP TLS report dictionary from parsedmarc
|
||||
|
||||
Returns:
|
||||
List of NDJSON event strings (empty if sent to API)
|
||||
"""
|
||||
logger.debug("Converting SMTP TLS report to Google SecOps UDM format")
|
||||
events = []
|
||||
|
||||
try:
|
||||
organization_name = smtp_tls_report.get("organization_name", "")
|
||||
begin_date = smtp_tls_report["begin_date"]
|
||||
end_date = smtp_tls_report["end_date"]
|
||||
|
||||
for policy in smtp_tls_report.get("policies", []):
|
||||
policy_domain = policy["policy_domain"]
|
||||
|
||||
for failure in policy.get("failure_details", []):
|
||||
# Build UDM event for each failure
|
||||
event: dict[str, Any] = {
|
||||
"event_type": "SMTP_TLS_REPORT",
|
||||
"metadata": {
|
||||
"event_timestamp": self._format_timestamp(begin_date),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"target": {
|
||||
"domain": {
|
||||
"name": policy_domain,
|
||||
}
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": "LOW",
|
||||
"description": f"SMTP TLS failure: {failure.get('result_type', 'unknown')}",
|
||||
"detection_fields": [
|
||||
{"key": "smtp_tls.policy_domain", "value": policy_domain},
|
||||
{"key": "smtp_tls.result_type", "value": failure.get("result_type", "")},
|
||||
{"key": "smtp_tls.failed_session_count", "value": failure.get("failed_session_count", 0)},
|
||||
{"key": "smtp_tls.report_org", "value": organization_name},
|
||||
{"key": "smtp_tls.report_begin", "value": begin_date},
|
||||
{"key": "smtp_tls.report_end", "value": end_date},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Add optional context fields (low-value, not for dashboarding)
|
||||
additional_context = []
|
||||
|
||||
if self.static_environment:
|
||||
additional_context.append(
|
||||
{"key": "environment", "value": self.static_environment}
|
||||
)
|
||||
|
||||
# Only add additional section if there's context to include
|
||||
if additional_context:
|
||||
event["additional"] = {"fields": additional_context}
|
||||
|
||||
# Add optional UDM fields
|
||||
if failure.get("sending_mta_ip"):
|
||||
event["principal"] = {"ip": [failure["sending_mta_ip"]]}
|
||||
|
||||
if self.static_observer_name:
|
||||
event["metadata"]["product_deployment_id"] = self.static_observer_name
|
||||
|
||||
events.append(json.dumps(event, ensure_ascii=False))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting SMTP TLS report to Google SecOps format: {e}")
|
||||
# Generate error event
|
||||
error_event: dict[str, Any] = {
|
||||
"event_type": "DMARC_PARSE_ERROR",
|
||||
"metadata": {
|
||||
"event_timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": "ERROR",
|
||||
"description": f"Failed to parse SMTP TLS report: {str(e)}",
|
||||
}
|
||||
],
|
||||
}
|
||||
events.append(json.dumps(error_event, ensure_ascii=False))
|
||||
|
||||
# Output events (to stdout or API)
|
||||
self._output_events(events)
|
||||
|
||||
# Return events only if using stdout (for CLI to print)
|
||||
return events if self.use_stdout else []
|
||||
@@ -29,7 +29,7 @@ classifiers = [
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python :: 3"
|
||||
]
|
||||
requires-python = ">=3.9, <3.14"
|
||||
requires-python = ">=3.9"
|
||||
dependencies = [
|
||||
"azure-identity>=1.8.0",
|
||||
"azure-monitor-ingestion>=1.0.0",
|
||||
@@ -48,7 +48,7 @@ dependencies = [
|
||||
"imapclient>=2.1.0",
|
||||
"kafka-python-ng>=2.2.2",
|
||||
"lxml>=4.4.0",
|
||||
"mailsuite>=1.11.1",
|
||||
"mailsuite>=1.11.2",
|
||||
"msgraph-core==0.2.2",
|
||||
"opensearch-py>=2.4.2,<=3.0.0",
|
||||
"publicsuffixlist>=0.10.0",
|
||||
|
||||
164
tests.py
164
tests.py
@@ -3,7 +3,6 @@
|
||||
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
|
||||
import json
|
||||
import os
|
||||
import unittest
|
||||
from glob import glob
|
||||
@@ -157,169 +156,6 @@ class Test(unittest.TestCase):
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
|
||||
print("Passed!")
|
||||
|
||||
def testGoogleSecOpsAggregateReport(self):
|
||||
"""Test Google SecOps aggregate report conversion"""
|
||||
print()
|
||||
from parsedmarc.google_secops import GoogleSecOpsClient
|
||||
|
||||
client = GoogleSecOpsClient(use_stdout=True)
|
||||
sample_path = "samples/aggregate/example.net!example.com!1529366400!1529452799.xml"
|
||||
print("Testing Google SecOps aggregate conversion for {0}: ".format(sample_path), end="")
|
||||
|
||||
parsed_file = parsedmarc.parse_report_file(sample_path, always_use_local_files=True)
|
||||
parsed_report = parsed_file["report"]
|
||||
|
||||
events = client.save_aggregate_report_to_google_secops(parsed_report)
|
||||
|
||||
# Verify we got events
|
||||
assert len(events) > 0, "Expected at least one event"
|
||||
|
||||
# Verify each event is valid JSON
|
||||
for event in events:
|
||||
event_dict = json.loads(event)
|
||||
assert "event_type" in event_dict
|
||||
assert event_dict["event_type"] == "DMARC_AGGREGATE"
|
||||
assert "metadata" in event_dict
|
||||
assert "principal" in event_dict
|
||||
assert "target" in event_dict
|
||||
assert "security_result" in event_dict
|
||||
|
||||
print("Passed!")
|
||||
|
||||
def testGoogleSecOpsForensicReport(self):
|
||||
"""Test Google SecOps forensic report conversion"""
|
||||
print()
|
||||
from parsedmarc.google_secops import GoogleSecOpsClient
|
||||
|
||||
# Test without payload
|
||||
client = GoogleSecOpsClient(include_ruf_payload=False, use_stdout=True)
|
||||
sample_path = "samples/forensic/dmarc_ruf_report_linkedin.eml"
|
||||
print("Testing Google SecOps forensic conversion (no payload) for {0}: ".format(sample_path), end="")
|
||||
|
||||
parsed_file = parsedmarc.parse_report_file(sample_path)
|
||||
parsed_report = parsed_file["report"]
|
||||
|
||||
events = client.save_forensic_report_to_google_secops(parsed_report)
|
||||
|
||||
# Verify we got events
|
||||
assert len(events) > 0, "Expected at least one event"
|
||||
|
||||
# Verify each event is valid JSON
|
||||
for event in events:
|
||||
event_dict = json.loads(event)
|
||||
assert "event_type" in event_dict
|
||||
assert event_dict["event_type"] == "DMARC_FORENSIC"
|
||||
|
||||
# Verify no payload in additional fields
|
||||
if "additional" in event_dict and "fields" in event_dict["additional"]:
|
||||
for field in event_dict["additional"]["fields"]:
|
||||
assert field["key"] != "message_sample", "Payload should not be included when disabled"
|
||||
|
||||
print("Passed!")
|
||||
|
||||
# Test with payload
|
||||
client_with_payload = GoogleSecOpsClient(
|
||||
include_ruf_payload=True,
|
||||
ruf_payload_max_bytes=100,
|
||||
use_stdout=True
|
||||
)
|
||||
print("Testing Google SecOps forensic conversion (with payload) for {0}: ".format(sample_path), end="")
|
||||
|
||||
events_with_payload = client_with_payload.save_forensic_report_to_google_secops(parsed_report)
|
||||
|
||||
# Verify we got events
|
||||
assert len(events_with_payload) > 0, "Expected at least one event"
|
||||
|
||||
# Verify payload is included
|
||||
for event in events_with_payload:
|
||||
event_dict = json.loads(event)
|
||||
|
||||
# Check if message_sample is in additional fields
|
||||
has_sample = False
|
||||
if "additional" in event_dict and "fields" in event_dict["additional"]:
|
||||
for field in event_dict["additional"]["fields"]:
|
||||
if field["key"] == "message_sample":
|
||||
has_sample = True
|
||||
# Verify truncation: max_bytes (100) + "... [truncated]" suffix (16 chars)
|
||||
# Allow some margin for the actual payload length
|
||||
max_expected_length = 100 + len("... [truncated]") + 10
|
||||
assert len(field["value"]) <= max_expected_length, f"Payload should be truncated, got {len(field['value'])} bytes"
|
||||
break
|
||||
|
||||
assert has_sample, "Payload should be included when enabled"
|
||||
|
||||
print("Passed!")
|
||||
|
||||
def testGoogleSecOpsConfiguration(self):
|
||||
"""Test Google SecOps client configuration"""
|
||||
print()
|
||||
from parsedmarc.google_secops import GoogleSecOpsClient
|
||||
|
||||
print("Testing Google SecOps client configuration: ", end="")
|
||||
|
||||
# Test stdout configuration
|
||||
client1 = GoogleSecOpsClient(use_stdout=True)
|
||||
assert client1.include_ruf_payload is False
|
||||
assert client1.ruf_payload_max_bytes == 4096
|
||||
assert client1.static_observer_vendor == "parsedmarc"
|
||||
assert client1.static_observer_name is None
|
||||
assert client1.static_environment is None
|
||||
assert client1.use_stdout is True
|
||||
|
||||
# Test custom configuration
|
||||
client2 = GoogleSecOpsClient(
|
||||
include_ruf_payload=True,
|
||||
ruf_payload_max_bytes=8192,
|
||||
static_observer_name="test-observer",
|
||||
static_observer_vendor="test-vendor",
|
||||
static_environment="prod",
|
||||
use_stdout=True
|
||||
)
|
||||
assert client2.include_ruf_payload is True
|
||||
assert client2.ruf_payload_max_bytes == 8192
|
||||
assert client2.static_observer_name == "test-observer"
|
||||
assert client2.static_observer_vendor == "test-vendor"
|
||||
assert client2.static_environment == "prod"
|
||||
|
||||
print("Passed!")
|
||||
|
||||
def testGoogleSecOpsSmtpTlsReport(self):
|
||||
"""Test Google SecOps SMTP TLS report conversion"""
|
||||
print()
|
||||
from parsedmarc.google_secops import GoogleSecOpsClient
|
||||
|
||||
client = GoogleSecOpsClient(use_stdout=True)
|
||||
sample_path = "samples/smtp_tls/rfc8460.json"
|
||||
print("Testing Google SecOps SMTP TLS conversion for {0}: ".format(sample_path), end="")
|
||||
|
||||
parsed_file = parsedmarc.parse_report_file(sample_path)
|
||||
parsed_report = parsed_file["report"]
|
||||
|
||||
events = client.save_smtp_tls_report_to_google_secops(parsed_report)
|
||||
|
||||
# Verify we got events
|
||||
assert len(events) > 0, "Expected at least one event"
|
||||
|
||||
# Verify each event is valid JSON
|
||||
for event in events:
|
||||
event_dict = json.loads(event)
|
||||
assert "event_type" in event_dict
|
||||
assert event_dict["event_type"] == "SMTP_TLS_REPORT"
|
||||
assert "metadata" in event_dict
|
||||
assert "target" in event_dict
|
||||
assert "security_result" in event_dict
|
||||
|
||||
# Verify failed_session_count is in detection_fields as an integer
|
||||
found_count = False
|
||||
for field in event_dict["security_result"][0]["detection_fields"]:
|
||||
if field["key"] == "smtp_tls.failed_session_count":
|
||||
assert isinstance(field["value"], int), "failed_session_count should be an integer"
|
||||
found_count = True
|
||||
break
|
||||
assert found_count, "failed_session_count should be in detection_fields"
|
||||
|
||||
print("Passed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
|
||||
Reference in New Issue
Block a user