Compare commits

..

19 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
dab2aaffda Update documentation with comprehensive field listings and correct service type examples
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-14 16:56:54 +00:00
copilot-swe-agent[bot]
19e8b498d0 Add source enrichment fields to forensic events matching aggregate reports
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-14 16:45:26 +00:00
copilot-swe-agent[bot]
91ae56c029 Add Chronicle Ingestion API support as primary method with stdout as alternative
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-13 16:04:52 +00:00
copilot-swe-agent[bot]
e0818a22f4 Add IP enrichment fields to detection_fields for enhanced Chronicle filtering
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-13 15:29:11 +00:00
copilot-swe-agent[bot]
da43efa4bf Move DMARC dimensions to detection_fields for Chronicle dashboard support
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 18:54:30 +00:00
copilot-swe-agent[bot]
cf916509ea Add SMTP TLS test and fix YARA-L boolean comparisons
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 18:26:37 +00:00
copilot-swe-agent[bot]
6ad7233983 Preserve native types for booleans and integers in UDM output
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 16:29:51 +00:00
copilot-swe-agent[bot]
63f8334e27 Add SMTP TLS and parse error event examples to documentation
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 16:23:27 +00:00
copilot-swe-agent[bot]
1aa0147c33 Add mailbox monitoring examples to Google SecOps documentation
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 16:05:36 +00:00
copilot-swe-agent[bot]
e9b4170591 Fix YARA-L rules to match actual event_type values in output
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 15:43:16 +00:00
copilot-swe-agent[bot]
d3a314171f Replace Splunk SPL with YARA-L in Google SecOps search examples
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 15:31:18 +00:00
copilot-swe-agent[bot]
b7823253a4 Fix SMTP TLS report policy domain extraction
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 15:15:50 +00:00
copilot-swe-agent[bot]
1887460ab6 Address code review feedback: remove redundant condition and improve test
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 15:12:33 +00:00
copilot-swe-agent[bot]
c84ddb4e89 Add Google SecOps documentation and update README
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 15:10:30 +00:00
copilot-swe-agent[bot]
78c863bd12 Add Google SecOps output module implementation
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-01-09 15:07:47 +00:00
copilot-swe-agent[bot]
12b9b37026 Initial plan 2026-01-09 14:56:48 +00:00
Sean Whalen
0e3a4b0f06 9.0.9
Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
2026-01-08 13:29:23 -05:00
maraspr
343b53ef18 remove newlines before b64decode (#649) 2026-01-08 12:24:20 -05:00
maraspr
792079a3e8 Validate that string is base64 (#648) 2026-01-08 10:15:27 -05:00
12 changed files with 1474 additions and 112 deletions

View File

@@ -1,5 +1,11 @@
# Changelog
## 9.0.9
### Fixes
- Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
## 9.0.8
### Fixes

View File

@@ -44,6 +44,7 @@ Thanks to all
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for
use with premade dashboards
- Optionally send reports to Apache Kafka
- Optionally send reports to Google SecOps (Chronicle) in UDM format via API or stdout
## Python Compatibility

View File

@@ -29,14 +29,3 @@ token_file = /etc/example/token.json
include_spam_trash = True
paginate_messages = True
scopes = https://www.googleapis.com/auth/gmail.modify
[msgraph]
auth_method = ClientSecret
client_id = 12345678-90ab-cdef-1234-567890abcdef
client_secret = your-client-secret-here
tenant_id = 12345678-90ab-cdef-1234-567890abcdef
mailbox = dmarc-reports@example.com
# Use standard folder names - they work across all locales
# and avoid "Default folder Root not found" errors
reports_folder = Inbox
archive_folder = Archive

View File

@@ -0,0 +1,494 @@
# Google SecOps (Chronicle) Output
`parsedmarc` can output DMARC reports to Google SecOps (Chronicle) in UDM (Unified Data Model) format.
## Configuration
To enable Google SecOps output, add a `[google_secops]` section to your configuration file:
### Primary Method: Chronicle Ingestion API
The recommended approach is to send events directly to Chronicle via the Ingestion API:
```ini
[general]
save_aggregate = True
save_forensic = True
[google_secops]
# Required: Path to Google service account JSON credentials file
api_credentials_file = /path/to/service-account-credentials.json
# Required: Chronicle customer ID
api_customer_id = your-customer-id-here
# Optional: Chronicle region (default: us)
# Options: us, europe, asia-southeast1, me-central2, australia-southeast1
api_region = us
# Optional: Log type for Chronicle ingestion (default: DMARC)
api_log_type = DMARC
# Optional: Include forensic report message payload (default: False)
# For privacy, message bodies are excluded by default
include_ruf_payload = False
# Optional: Maximum bytes of forensic message payload to include (default: 4096)
ruf_payload_max_bytes = 4096
# Optional: Static observer name for telemetry identification
static_observer_name = my-parsedmarc-instance
# Optional: Static observer vendor (default: parsedmarc)
static_observer_vendor = parsedmarc
# Optional: Static environment label (e.g., prod, dev)
static_environment = prod
```
### Alternative Method: stdout Output
If you prefer to use an external log shipper (Fluentd, Logstash, Chronicle forwarder), set `use_stdout = True`:
```ini
[google_secops]
# Output to stdout instead of Chronicle API
use_stdout = True
# Other optional configuration options (as above)
include_ruf_payload = False
ruf_payload_max_bytes = 4096
static_observer_name = my-instance
static_observer_vendor = parsedmarc
static_environment = prod
```
## Output Format
The Google SecOps output produces newline-delimited JSON (NDJSON) in Chronicle UDM format, which can be ingested into Google SecOps for hunting and dashboarding.
### Event Types
1. **DMARC_AGGREGATE**: One event per aggregate report row, preserving count and period information
2. **DMARC_FORENSIC**: One event per forensic report
3. **SMTP_TLS_REPORT**: One event per SMTP TLS failure detail
4. **DMARC_PARSE_ERROR**: Generated when parsing fails (does not crash)
### UDM Schema
Each event includes:
- **metadata**: Event timestamp, type, product name, and vendor
- `event_timestamp`: RFC 3339 formatted timestamp
- `event_type`: Always "GENERIC_EVENT" for UDM
- `product_name`: Always "parsedmarc"
- `vendor_name`: Configurable via `static_observer_vendor` (default: "parsedmarc")
- `product_deployment_id` (optional): Set via `static_observer_name` config
- **principal**: Source IP address, location (country), and hostname (reverse DNS)
- `ip`: Array containing source IP address (always present)
- `location.country_or_region` (optional): ISO country code from IP geolocation
- `hostname` (optional): Reverse DNS hostname for source IP
- **target**: Domain name (from DMARC policy)
- `domain.name`: The domain being protected by DMARC
- **security_result**: Severity level, description, and detection fields for dashboarding
- `severity`: Derived severity level (HIGH/MEDIUM/LOW/ERROR)
- `description`: Human-readable event description
- **detection_fields**: Key DMARC dimensions for filtering and grouping
- All dashboard-relevant fields use `dmarc.*` or `smtp_tls.*` prefixes for easy identification
- Includes IP enrichment data (service name and type from reverse DNS mapping) for enhanced filtering
- See "Detection Fields" section below for complete field listings
- **additional.fields** (optional): Low-value context fields not typically used for dashboarding
- SPF/DKIM authentication details (e.g., `spf_0_domain`, `spf_0_result`)
- Forensic report metadata (e.g., `feedback_type`, `message_id`, `authentication_results`)
- Base domain, environment tags, optional message samples
**Design Rationale**: DMARC dimensions are placed in `security_result[].detection_fields` rather than `additional.fields` because Chronicle dashboards, stats searches, and aggregations work best with UDM label arrays. The `additional.fields` is a protobuf Struct intended for opaque context and is not reliably queryable for dashboard operations.
### Detection Fields
**Aggregate Report Fields** (`DMARC_AGGREGATE` events):
- `dmarc.disposition`: DMARC policy action (none, quarantine, reject)
- `dmarc.policy`: Published DMARC policy (none, quarantine, reject)
- `dmarc.pass`: Boolean - overall DMARC authentication result
- `dmarc.spf_aligned`: Boolean - SPF alignment status
- `dmarc.dkim_aligned`: Boolean - DKIM alignment status
- `dmarc.header_from`: Header From domain
- `dmarc.envelope_from`: Envelope From domain (MAIL FROM)
- `dmarc.report_org`: Reporting organization name
- `dmarc.report_id`: Unique report identifier
- `dmarc.report_begin`: Report period start timestamp
- `dmarc.report_end`: Report period end timestamp
- `dmarc.row_count`: Number of messages represented by this record
- `dmarc.spf_result` (optional): SPF authentication result (pass, fail, etc.)
- `dmarc.dkim_result` (optional): DKIM authentication result (pass, fail, etc.)
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
**Forensic Report Fields** (`DMARC_FORENSIC` events):
- `dmarc.auth_failure`: Authentication failure type(s) (dmarc, spf, dkim)
- `dmarc.reported_domain`: Domain that failed DMARC authentication
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
**SMTP TLS Report Fields** (`SMTP_TLS_REPORT` events):
- `smtp_tls.policy_domain`: Domain being monitored for TLS policy
- `smtp_tls.result_type`: Type of TLS failure (certificate-expired, validation-failure, etc.)
- `smtp_tls.failed_session_count`: Number of failed sessions
- `smtp_tls.report_org`: Reporting organization name
- `smtp_tls.report_begin`: Report period start timestamp
- `smtp_tls.report_end`: Report period end timestamp
### Severity Heuristics
- **HIGH**: DMARC disposition = reject
- **MEDIUM**: DMARC disposition = quarantine with partial SPF/DKIM failures
- **LOW**: DMARC disposition = none or pass
## Example Output
### Aggregate Report Event
```json
{
"event_type": "DMARC_AGGREGATE",
"metadata": {
"event_timestamp": "2018-06-19T00:00:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"principal": {
"ip": ["199.230.200.36"],
"location": {"country_or_region": "US"}
},
"target": {
"domain": {"name": "example.com"}
},
"security_result": [{
"severity": "LOW",
"description": "DMARC fail; SPF=pass; DKIM=pass; SPF not aligned; DKIM not aligned; disposition=none",
"detection_fields": [
{"key": "dmarc.disposition", "value": "none"},
{"key": "dmarc.policy", "value": "none"},
{"key": "dmarc.pass", "value": false},
{"key": "dmarc.spf_aligned", "value": false},
{"key": "dmarc.dkim_aligned", "value": false},
{"key": "dmarc.header_from", "value": "example.com"},
{"key": "dmarc.envelope_from", "value": "example.com"},
{"key": "dmarc.report_org", "value": "example.net"},
{"key": "dmarc.report_id", "value": "b043f0e264cf4ea995e93765242f6dfb"},
{"key": "dmarc.report_begin", "value": "2018-06-19 00:00:00"},
{"key": "dmarc.report_end", "value": "2018-06-19 23:59:59"},
{"key": "dmarc.row_count", "value": 1},
{"key": "dmarc.spf_result", "value": "pass"},
{"key": "dmarc.dkim_result", "value": "pass"},
{"key": "dmarc.source_service_name", "value": "Example Mail Service"},
{"key": "dmarc.source_service_type", "value": "Email Provider"}
]
}],
"additional": {
"fields": [
{"key": "spf_0_domain", "value": "example.com"},
{"key": "spf_0_result", "value": "pass"},
{"key": "dkim_0_domain", "value": "example.com"},
{"key": "dkim_0_result", "value": "pass"}
]
}
}
```
### Forensic Report Event
```json
{
"event_type": "DMARC_FORENSIC",
"metadata": {
"event_timestamp": "2019-04-30T02:09:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"principal": {
"ip": ["10.10.10.10"],
"location": {"country_or_region": "US"},
"hostname": "mail.example-sender.com"
},
"target": {
"domain": {"name": "example.com"}
},
"security_result": [{
"severity": "MEDIUM",
"description": "DMARC forensic report: authentication failure (dmarc)",
"detection_fields": [
{"key": "dmarc.auth_failure", "value": "dmarc"},
{"key": "dmarc.reported_domain", "value": "example.com"},
{"key": "dmarc.source_service_name", "value": "Example Mail Provider"},
{"key": "dmarc.source_service_type", "value": "Email Provider"}
]
}],
"additional": {
"fields": [
{"key": "feedback_type", "value": "auth-failure"},
{"key": "message_id", "value": "<01010101010101010101010101010101@ABAB01MS0016.someserver.loc>"},
{"key": "authentication_results", "value": "dmarc=fail (p=none; dis=none) header.from=example.com"},
{"key": "delivery_result", "value": "delivered"}
]
}
}
```
### SMTP TLS Report Event
```json
{
"event_type": "SMTP_TLS_REPORT",
"metadata": {
"event_timestamp": "2016-04-01T00:00:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"target": {
"domain": {
"name": "company-y.example"
}
},
"security_result": [{
"severity": "LOW",
"description": "SMTP TLS failure: certificate-expired",
"detection_fields": [
{"key": "smtp_tls.policy_domain", "value": "company-y.example"},
{"key": "smtp_tls.result_type", "value": "certificate-expired"},
{"key": "smtp_tls.failed_session_count", "value": 100},
{"key": "smtp_tls.report_org", "value": "Company-X"},
{"key": "smtp_tls.report_begin", "value": "2016-04-01T00:00:00Z"},
{"key": "smtp_tls.report_end", "value": "2016-04-01T23:59:59Z"}
]
}],
"principal": {
"ip": ["2001:db8:abcd:0012::1"]
}
}
```
### Parse Error Event
```json
{
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": "2026-01-09T16:22:10.933751+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"security_result": [{
"severity": "ERROR",
"description": "Failed to parse DMARC report: Invalid XML structure"
}]
}
```
## Google SecOps Searches
Here are some example YARA-L rules you can use in Google SecOps to hunt for DMARC issues:
### Find all DMARC aggregate report failures
```yara-l
rule dmarc_aggregate_failures {
meta:
author = "parsedmarc"
description = "Detect DMARC authentication failures in aggregate reports"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
condition:
$e
}
```
### Find high severity DMARC events (rejected mail)
```yara-l
rule high_severity_dmarc_events {
meta:
author = "parsedmarc"
description = "Detect high severity DMARC aggregate events (rejected mail)"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.severity = "HIGH"
condition:
$e
}
```
### Find repeated DMARC failures from same source IP
```yara-l
rule repeated_dmarc_failures {
meta:
author = "parsedmarc"
description = "Detect repeated DMARC failures from the same source IP"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
$e.principal.ip = $source_ip
match:
$source_ip over 1h
condition:
#e > 5
}
```
### Find DMARC forensic reports with authentication failures
```yara-l
rule dmarc_forensic_failures {
meta:
author = "parsedmarc"
description = "Detect DMARC forensic reports with authentication failures"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_FORENSIC"
$e.security_result.detection_fields.key = "dmarc.auth_failure"
condition:
$e
}
```
### Find DMARC failures from specific mail service types
```yara-l
rule dmarc_failures_by_service_type {
meta:
author = "parsedmarc"
description = "Detect DMARC failures from specific mail service types"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
$e.security_result.detection_fields.key = "dmarc.source_service_type"
$e.security_result.detection_fields.value = "Email Provider"
condition:
$e
}
```
### Find SMTP TLS failures
```yara-l
rule smtp_tls_failures {
meta:
author = "parsedmarc"
description = "Detect SMTP TLS failures"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "SMTP_TLS_REPORT"
condition:
$e
}
```
## Privacy Considerations
By default, forensic report message bodies are **excluded** from the output to protect privacy. If you need to include message samples for investigation:
1. Set `include_ruf_payload = True` in your configuration
2. Adjust `ruf_payload_max_bytes` to limit the amount of data included (default: 4096 bytes)
3. Message samples will be truncated if they exceed the configured maximum
**Note**: Be aware of data privacy regulations (GDPR, CCPA, etc.) when including message payloads in security telemetry.
## Usage
The Google SecOps output works with all parsedmarc input methods, including file processing and mailbox monitoring.
### Primary Method: Direct API Ingestion
With Chronicle Ingestion API configured, events are sent directly to Chronicle:
```bash
# Process files - events are sent to Chronicle API automatically
parsedmarc -c config.ini samples/aggregate/*.xml
# Monitor mailbox - events are sent to Chronicle API in real-time
parsedmarc -c config.ini
```
No additional log shippers or pipelines are needed. The Google SecOps client handles authentication and batching automatically.
### Alternative Method: stdout Output with Log Shipper
If using `use_stdout = True` in your configuration, output DMARC reports to an external log shipper:
#### Processing Files
```bash
# Output to stdout
parsedmarc -c config.ini samples/aggregate/*.xml > dmarc_events.ndjson
# Stream to file
parsedmarc -c config.ini samples/aggregate/*.xml >> /var/log/dmarc/events.ndjson
# Pipe to log shipper (e.g., Fluentd, Logstash, Chronicle forwarder)
parsedmarc -c config.ini samples/aggregate/*.xml | fluentd
```
#### Monitoring Mailboxes
The Google SecOps output automatically works when monitoring mailboxes via IMAP, Microsoft Graph, or Gmail API. Configure your mailbox connection and enable watching:
```ini
[general]
save_aggregate = True
save_forensic = True
[mailbox]
watch = True
delete = False
batch_size = 10
[imap]
host = imap.example.com
user = dmarc@example.com
password = yourpassword
[google_secops]
# Use stdout mode for log shipper integration
use_stdout = True
include_ruf_payload = False
static_observer_name = mailbox-monitor
static_environment = prod
```
When watching a mailbox with stdout mode, parsedmarc continuously outputs UDM events as new reports arrive:
```bash
parsedmarc -c config.ini | fluentd
```
The output is in newline-delimited JSON format, with one UDM event per line, ready for collection by your log shipper.

View File

@@ -44,6 +44,7 @@ and Valimail.
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for use
with premade dashboards
- Optionally send reports to Apache Kafka
- Optionally send reports to Google SecOps (Chronicle) in UDM format
## Python Compatibility
@@ -74,6 +75,7 @@ elasticsearch
opensearch
kibana
splunk
google_secops
davmail
dmarc
contributing

View File

@@ -229,18 +229,6 @@ The full set of configuration options are:
username, you must grant the app `Mail.ReadWrite.Shared`.
:::
:::{tip}
When configuring folder names (e.g., `reports_folder`, `archive_folder`),
you can use standard folder names like `Inbox`, `Archive`, `Sent Items`, etc.
These will be automatically mapped to Microsoft Graph's well-known folder names,
which works reliably across different mailbox locales and avoids issues with
uninitialized or shared mailboxes. Supported folder names include:
- English: Inbox, Sent Items, Deleted Items, Drafts, Junk Email, Archive, Outbox
- German: Posteingang, Gesendete Elemente, Gelöschte Elemente, Entwürfe, Junk-E-Mail, Archiv
- French: Boîte de réception, Éléments envoyés, Éléments supprimés, Brouillons, Courrier indésirable, Archives
- Spanish: Bandeja de entrada, Elementos enviados, Elementos eliminados, Borradores, Correo no deseado
:::
:::{warning}
If you are using the `ClientSecret` auth method, you need to
grant the `Mail.ReadWrite` (application) permission to the

View File

@@ -892,7 +892,11 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
try:
if isinstance(content, str):
try:
file_object = BytesIO(b64decode(content))
file_object = BytesIO(
b64decode(
content.replace("\n", "").replace("\r", ""), validate=True
)
)
except binascii.Error:
return content
header = file_object.read(6)

View File

@@ -27,6 +27,7 @@ from parsedmarc import (
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
google_secops,
kafkaclient,
loganalytics,
opensearch,
@@ -284,6 +285,14 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_aggregate_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_aggregate_url:
indent_value = 2 if opts.prettify_json else None
@@ -369,6 +378,14 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_forensic_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_forensic_url:
indent_value = 2 if opts.prettify_json else None
@@ -454,6 +471,14 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_smtp_tls_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_smtp_tls_url:
indent_value = 2 if opts.prettify_json else None
@@ -722,6 +747,17 @@ def _main():
gelf_host=None,
gelf_port=None,
gelf_mode=None,
google_secops=False,
google_secops_include_ruf_payload=False,
google_secops_ruf_payload_max_bytes=4096,
google_secops_static_observer_name=None,
google_secops_static_observer_vendor="parsedmarc",
google_secops_static_environment=None,
google_secops_api_credentials_file=None,
google_secops_api_customer_id=None,
google_secops_api_region="us",
google_secops_api_log_type="DMARC",
google_secops_use_stdout=False,
webhook_aggregate_url=None,
webhook_forensic_url=None,
webhook_smtp_tls_url=None,
@@ -1301,6 +1337,50 @@ def _main():
logger.critical("mode setting missing from the gelf config section")
exit(-1)
if "google_secops" in config.sections():
google_secops_config = config["google_secops"]
opts.google_secops = True
if "include_ruf_payload" in google_secops_config:
opts.google_secops_include_ruf_payload = bool(
google_secops_config.getboolean("include_ruf_payload")
)
if "ruf_payload_max_bytes" in google_secops_config:
opts.google_secops_ruf_payload_max_bytes = google_secops_config.getint(
"ruf_payload_max_bytes"
)
if "static_observer_name" in google_secops_config:
opts.google_secops_static_observer_name = google_secops_config[
"static_observer_name"
]
if "static_observer_vendor" in google_secops_config:
opts.google_secops_static_observer_vendor = google_secops_config[
"static_observer_vendor"
]
if "static_environment" in google_secops_config:
opts.google_secops_static_environment = google_secops_config[
"static_environment"
]
if "api_credentials_file" in google_secops_config:
opts.google_secops_api_credentials_file = google_secops_config[
"api_credentials_file"
]
if "api_customer_id" in google_secops_config:
opts.google_secops_api_customer_id = google_secops_config[
"api_customer_id"
]
if "api_region" in google_secops_config:
opts.google_secops_api_region = google_secops_config[
"api_region"
]
if "api_log_type" in google_secops_config:
opts.google_secops_api_log_type = google_secops_config[
"api_log_type"
]
if "use_stdout" in google_secops_config:
opts.google_secops_use_stdout = google_secops_config.getboolean(
"use_stdout"
)
if "webhook" in config.sections():
webhook_config = config["webhook"]
if "aggregate_url" in webhook_config:
@@ -1479,6 +1559,23 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
if opts.google_secops:
try:
google_secops_client = google_secops.GoogleSecOpsClient(
include_ruf_payload=opts.google_secops_include_ruf_payload,
ruf_payload_max_bytes=opts.google_secops_ruf_payload_max_bytes,
static_observer_name=opts.google_secops_static_observer_name,
static_observer_vendor=opts.google_secops_static_observer_vendor,
static_environment=opts.google_secops_static_environment,
api_credentials_file=opts.google_secops_api_credentials_file,
api_customer_id=opts.google_secops_api_customer_id,
api_region=opts.google_secops_api_region,
api_log_type=opts.google_secops_api_log_type,
use_stdout=opts.google_secops_use_stdout,
)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url

View File

@@ -1,3 +1,3 @@
__version__ = "9.0.8"
__version__ = "9.0.9"
USER_AGENT = f"parsedmarc/{__version__}"

709
parsedmarc/google_secops.py Normal file
View File

@@ -0,0 +1,709 @@
# -*- coding: utf-8 -*-
"""Google SecOps (Chronicle) output module for parsedmarc"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Optional
import requests
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
class GoogleSecOpsError(RuntimeError):
"""Raised when a Google SecOps API error occurs"""
class GoogleSecOpsClient:
"""A client for Google SecOps (Chronicle) UDM output"""
def __init__(
self,
include_ruf_payload: bool = False,
ruf_payload_max_bytes: int = 4096,
static_observer_name: Optional[str] = None,
static_observer_vendor: str = "parsedmarc",
static_environment: Optional[str] = None,
api_credentials_file: Optional[str] = None,
api_customer_id: Optional[str] = None,
api_region: str = "us",
api_log_type: str = "DMARC",
use_stdout: bool = False,
):
"""
Initializes the GoogleSecOpsClient
Args:
include_ruf_payload: Include RUF message payload in output
ruf_payload_max_bytes: Maximum bytes of RUF payload to include
static_observer_name: Static observer name for telemetry
static_observer_vendor: Static observer vendor (default: parsedmarc)
static_environment: Static environment (prod/dev/custom string)
api_credentials_file: Path to Google service account JSON credentials
api_customer_id: Chronicle customer ID (required for API)
api_region: Chronicle region (us, europe, asia-southeast1, etc.)
api_log_type: Log type for Chronicle ingestion (default: DMARC)
use_stdout: Output to stdout instead of API (default: False)
"""
self.include_ruf_payload = include_ruf_payload
self.ruf_payload_max_bytes = ruf_payload_max_bytes
self.static_observer_name = static_observer_name
self.static_observer_vendor = static_observer_vendor
self.static_environment = static_environment
self.use_stdout = use_stdout
# API configuration
self.api_credentials_file = api_credentials_file
self.api_customer_id = api_customer_id
self.api_region = api_region
self.api_log_type = api_log_type
self.credentials = None
self.session = None
# Initialize API client if not using stdout
if not self.use_stdout:
if not self.api_credentials_file or not self.api_customer_id:
raise GoogleSecOpsError(
"api_credentials_file and api_customer_id are required when not using stdout. "
"Set use_stdout=True to output to stdout instead."
)
self._initialize_api_client()
def _initialize_api_client(self):
"""Initialize the Chronicle API client with authentication"""
try:
logger.debug("Initializing Chronicle API client")
# Load service account credentials
self.credentials = service_account.Credentials.from_service_account_file(
self.api_credentials_file,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
# Create session with authentication
self.session = requests.Session()
self.session.headers.update({"User-Agent": USER_AGENT})
logger.info("Chronicle API client initialized successfully")
except Exception as e:
raise GoogleSecOpsError(f"Failed to initialize Chronicle API client: {e}")
def _get_api_endpoint(self) -> str:
"""Get the Chronicle Ingestion API endpoint based on region"""
return f"https://{self.api_region}-chronicle.googleapis.com/v1alpha/projects/{self.api_customer_id}/locations/{self.api_region}/instances/default/logTypes/{self.api_log_type}/logs:import"
def _send_events_to_api(self, events: list[str]) -> None:
"""
Send UDM events to Chronicle Ingestion API
Args:
events: List of NDJSON event strings
"""
if not events:
return
try:
# Refresh credentials if needed
if not self.credentials.valid:
self.credentials.refresh(Request())
# Prepare request
endpoint = self._get_api_endpoint()
headers = {
"Authorization": f"Bearer {self.credentials.token}",
"Content-Type": "application/json",
}
# Chronicle expects events in inline_source format
# Each event should be a separate log entry
payload = {
"inline_source": {
"logs": [json.loads(event) for event in events]
}
}
logger.debug(f"Sending {len(events)} events to Chronicle API")
response = self.session.post(
endpoint,
headers=headers,
json=payload,
timeout=60,
)
if response.status_code == 200:
logger.info(f"Successfully sent {len(events)} events to Chronicle")
else:
error_msg = f"Chronicle API error: {response.status_code} - {response.text}"
logger.error(error_msg)
raise GoogleSecOpsError(error_msg)
except GoogleSecOpsError:
raise
except Exception as e:
raise GoogleSecOpsError(f"Failed to send events to Chronicle API: {e}")
def _output_events(self, events: list[str]) -> None:
"""
Output events either to stdout or Chronicle API
Args:
events: List of NDJSON event strings
"""
if self.use_stdout:
# Output to stdout for collection by external log shippers
for event in events:
print(event)
else:
# Send directly to Chronicle API
self._send_events_to_api(events)
def _get_severity(self, disposition: str, spf_aligned: bool, dkim_aligned: bool) -> str:
"""
Derive severity from DMARC disposition and alignment
Args:
disposition: DMARC policy disposition
spf_aligned: Whether SPF is aligned
dkim_aligned: Whether DKIM is aligned
Returns:
Severity level: HIGH, MEDIUM, or LOW
"""
if disposition == "reject":
return "HIGH"
elif disposition == "quarantine" and not (spf_aligned or dkim_aligned):
return "MEDIUM"
else:
return "LOW"
def _get_description(
self,
dmarc_pass: bool,
spf_result: Optional[str],
dkim_result: Optional[str],
spf_aligned: bool,
dkim_aligned: bool,
disposition: str,
) -> str:
"""
Generate description for the event
Args:
dmarc_pass: Whether DMARC passed
spf_result: SPF result
dkim_result: DKIM result
spf_aligned: Whether SPF is aligned
dkim_aligned: Whether DKIM is aligned
disposition: DMARC disposition
Returns:
Human-readable description
"""
parts = []
if dmarc_pass:
parts.append("DMARC pass")
else:
parts.append("DMARC fail")
if spf_result:
parts.append(f"SPF={spf_result}")
if dkim_result:
parts.append(f"DKIM={dkim_result}")
if spf_aligned:
parts.append("SPF aligned")
elif spf_result:
parts.append("SPF not aligned")
if dkim_aligned:
parts.append("DKIM aligned")
elif dkim_result:
parts.append("DKIM not aligned")
parts.append(f"disposition={disposition}")
return "; ".join(parts)
def _format_timestamp(self, timestamp_str: str) -> str:
"""
Convert parsedmarc timestamp to RFC 3339 format
Args:
timestamp_str: Timestamp string from parsedmarc
Returns:
RFC 3339 formatted timestamp
"""
try:
dt = human_timestamp_to_datetime(timestamp_str)
# Ensure timezone-aware datetime
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
except Exception:
# Fallback to current time if parsing fails
return datetime.now(timezone.utc).isoformat()
def save_aggregate_report_to_google_secops(
self, aggregate_report: dict[str, Any]
) -> list[str]:
"""
Convert aggregate DMARC report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
aggregate_report: Aggregate report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting aggregate report to Google SecOps UDM format")
events = []
try:
report_metadata = aggregate_report["report_metadata"]
policy_published = aggregate_report["policy_published"]
for record in aggregate_report["records"]:
# Extract values
source_ip = record["source"]["ip_address"]
source_country = record["source"].get("country")
source_reverse_dns = record["source"].get("reverse_dns")
source_base_domain = record["source"].get("base_domain")
source_name = record["source"].get("name")
source_type = record["source"].get("type")
header_from = record["identifiers"]["header_from"]
envelope_from = record["identifiers"]["envelope_from"]
disposition = record["policy_evaluated"]["disposition"]
spf_aligned = record["alignment"]["spf"]
dkim_aligned = record["alignment"]["dkim"]
dmarc_pass = record["alignment"]["dmarc"]
count = record["count"]
interval_begin = record["interval_begin"]
# Get auth results
spf_results = record["auth_results"].get("spf", [])
dkim_results = record["auth_results"].get("dkim", [])
spf_result = spf_results[0]["result"] if spf_results else None
dkim_result = dkim_results[0]["result"] if dkim_results else None
# Build UDM event
event: dict[str, Any] = {
"event_type": "DMARC_AGGREGATE",
"metadata": {
"event_timestamp": self._format_timestamp(interval_begin),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"principal": {
"ip": [source_ip],
},
"target": {
"domain": {
"name": header_from,
}
},
"security_result": [
{
"severity": self._get_severity(
disposition, spf_aligned, dkim_aligned
),
"description": self._get_description(
dmarc_pass,
spf_result,
dkim_result,
spf_aligned,
dkim_aligned,
disposition,
),
"detection_fields": [
{"key": "dmarc.disposition", "value": disposition},
{"key": "dmarc.policy", "value": policy_published["p"]},
{"key": "dmarc.pass", "value": dmarc_pass},
{"key": "dmarc.spf_aligned", "value": spf_aligned},
{"key": "dmarc.dkim_aligned", "value": dkim_aligned},
{"key": "dmarc.header_from", "value": header_from},
{"key": "dmarc.envelope_from", "value": envelope_from},
{"key": "dmarc.report_org", "value": report_metadata["org_name"]},
{"key": "dmarc.report_id", "value": report_metadata["report_id"]},
{"key": "dmarc.report_begin", "value": report_metadata["begin_date"]},
{"key": "dmarc.report_end", "value": report_metadata["end_date"]},
{"key": "dmarc.row_count", "value": count},
],
}
],
}
# Add optional fields to detection_fields
if spf_result:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.spf_result", "value": spf_result}
)
if dkim_result:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.dkim_result", "value": dkim_result}
)
if source_name:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_name", "value": source_name}
)
if source_type:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_type", "value": source_type}
)
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if source_base_domain:
additional_context.append(
{"key": "source_base_domain", "value": source_base_domain}
)
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Add SPF auth results to context
if spf_results:
for idx, spf in enumerate(spf_results):
additional_context.append(
{"key": f"spf_{idx}_domain", "value": spf.get("domain", "")}
)
additional_context.append(
{"key": f"spf_{idx}_result", "value": spf.get("result", "")}
)
# Add DKIM auth results to context
if dkim_results:
for idx, dkim in enumerate(dkim_results):
additional_context.append(
{"key": f"dkim_{idx}_domain", "value": dkim.get("domain", "")}
)
additional_context.append(
{"key": f"dkim_{idx}_result", "value": dkim.get("result", "")}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if source_country:
event["principal"]["location"] = {"country_or_region": source_country}
if source_reverse_dns:
event["principal"]["hostname"] = source_reverse_dns
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting aggregate report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse DMARC aggregate report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []
def save_forensic_report_to_google_secops(
self, forensic_report: dict[str, Any]
) -> list[str]:
"""
Convert forensic DMARC report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
forensic_report: Forensic report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting forensic report to Google SecOps UDM format")
events = []
try:
source_ip = forensic_report["source"]["ip_address"]
source_country = forensic_report["source"].get("country")
source_reverse_dns = forensic_report["source"].get("reverse_dns")
source_base_domain = forensic_report["source"].get("base_domain")
source_name = forensic_report["source"].get("name")
source_type = forensic_report["source"].get("type")
reported_domain = forensic_report["reported_domain"]
arrival_date = forensic_report["arrival_date_utc"]
auth_failure = forensic_report.get("auth_failure", [])
# Determine severity - forensic reports indicate failures
# Default to MEDIUM for authentication failures
severity = "MEDIUM"
# Build description
auth_failure_str = ", ".join(auth_failure) if auth_failure else "unknown"
description = f"DMARC forensic report: authentication failure ({auth_failure_str})"
# Build UDM event
event: dict[str, Any] = {
"event_type": "DMARC_FORENSIC",
"metadata": {
"event_timestamp": self._format_timestamp(arrival_date),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"principal": {
"ip": [source_ip],
},
"target": {
"domain": {
"name": reported_domain,
}
},
"security_result": [
{
"severity": severity,
"description": description,
"detection_fields": [
{"key": "dmarc.auth_failure", "value": auth_failure_str},
{"key": "dmarc.reported_domain", "value": reported_domain},
],
}
],
}
# Add optional fields to detection_fields (matching aggregate report field names)
if source_name:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_name", "value": source_name}
)
if source_type:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_type", "value": source_type}
)
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if source_base_domain:
additional_context.append(
{"key": "source_base_domain", "value": source_base_domain}
)
if forensic_report.get("feedback_type"):
additional_context.append(
{"key": "feedback_type", "value": forensic_report["feedback_type"]}
)
if forensic_report.get("message_id"):
additional_context.append(
{"key": "message_id", "value": forensic_report["message_id"]}
)
if forensic_report.get("authentication_results"):
additional_context.append(
{"key": "authentication_results", "value": forensic_report["authentication_results"]}
)
if forensic_report.get("delivery_result"):
additional_context.append(
{"key": "delivery_result", "value": forensic_report["delivery_result"]}
)
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Add payload excerpt if enabled
if self.include_ruf_payload and forensic_report.get("sample"):
sample = forensic_report["sample"]
if len(sample) > self.ruf_payload_max_bytes:
sample = sample[:self.ruf_payload_max_bytes] + "... [truncated]"
additional_context.append(
{"key": "message_sample", "value": sample}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if source_country:
event["principal"]["location"] = {"country_or_region": source_country}
if source_reverse_dns:
event["principal"]["hostname"] = source_reverse_dns
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting forensic report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse DMARC forensic report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []
def save_smtp_tls_report_to_google_secops(
self, smtp_tls_report: dict[str, Any]
) -> list[str]:
"""
Convert SMTP TLS report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
smtp_tls_report: SMTP TLS report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting SMTP TLS report to Google SecOps UDM format")
events = []
try:
organization_name = smtp_tls_report.get("organization_name", "")
begin_date = smtp_tls_report["begin_date"]
end_date = smtp_tls_report["end_date"]
for policy in smtp_tls_report.get("policies", []):
policy_domain = policy["policy_domain"]
for failure in policy.get("failure_details", []):
# Build UDM event for each failure
event: dict[str, Any] = {
"event_type": "SMTP_TLS_REPORT",
"metadata": {
"event_timestamp": self._format_timestamp(begin_date),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"target": {
"domain": {
"name": policy_domain,
}
},
"security_result": [
{
"severity": "LOW",
"description": f"SMTP TLS failure: {failure.get('result_type', 'unknown')}",
"detection_fields": [
{"key": "smtp_tls.policy_domain", "value": policy_domain},
{"key": "smtp_tls.result_type", "value": failure.get("result_type", "")},
{"key": "smtp_tls.failed_session_count", "value": failure.get("failed_session_count", 0)},
{"key": "smtp_tls.report_org", "value": organization_name},
{"key": "smtp_tls.report_begin", "value": begin_date},
{"key": "smtp_tls.report_end", "value": end_date},
],
}
],
}
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if failure.get("sending_mta_ip"):
event["principal"] = {"ip": [failure["sending_mta_ip"]]}
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting SMTP TLS report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse SMTP TLS report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []

View File

@@ -20,59 +20,6 @@ from msgraph.core import GraphClient
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
# Mapping of common folder names to Microsoft Graph well-known folder names
# This avoids the "Default folder Root not found" error on uninitialized mailboxes
WELL_KNOWN_FOLDER_MAP = {
# English names
"inbox": "inbox",
"sent items": "sentitems",
"sent": "sentitems",
"sentitems": "sentitems",
"deleted items": "deleteditems",
"deleted": "deleteditems",
"deleteditems": "deleteditems",
"trash": "deleteditems",
"drafts": "drafts",
"junk email": "junkemail",
"junk": "junkemail",
"junkemail": "junkemail",
"spam": "junkemail",
"archive": "archive",
"outbox": "outbox",
"conversation history": "conversationhistory",
"conversationhistory": "conversationhistory",
# German names
"posteingang": "inbox",
"gesendete elemente": "sentitems",
"gesendet": "sentitems",
"gelöschte elemente": "deleteditems",
"gelöscht": "deleteditems",
"entwürfe": "drafts",
"junk-e-mail": "junkemail",
"archiv": "archive",
"postausgang": "outbox",
# French names
"boîte de réception": "inbox",
"éléments envoyés": "sentitems",
"envoyés": "sentitems",
"éléments supprimés": "deleteditems",
"supprimés": "deleteditems",
"brouillons": "drafts",
"courrier indésirable": "junkemail",
"archives": "archive",
"boîte d'envoi": "outbox",
# Spanish names
"bandeja de entrada": "inbox",
"elementos enviados": "sentitems",
"enviados": "sentitems",
"elementos eliminados": "deleteditems",
"eliminados": "deleteditems",
"borradores": "drafts",
"correo no deseado": "junkemail",
"archivar": "archive",
"bandeja de salida": "outbox",
}
class AuthMethod(Enum):
DeviceCode = 1
@@ -183,13 +130,6 @@ class MSGraphConnection(MailboxConnection):
self.mailbox_name = mailbox
def create_folder(self, folder_name: str):
# Check if this is a well-known folder - they already exist and cannot be created
if "/" not in folder_name:
well_known_name = WELL_KNOWN_FOLDER_MAP.get(folder_name.lower())
if well_known_name:
logger.debug(f"Folder '{folder_name}' is a well-known folder, skipping creation")
return
sub_url = ""
path_parts = folder_name.split("/")
if len(path_parts) > 1: # Folder is a subFolder
@@ -306,12 +246,6 @@ class MSGraphConnection(MailboxConnection):
parent_folder_id = folder_id
return self._find_folder_id_with_parent(path_parts[-1], parent_folder_id)
else:
# Check if this is a well-known folder name (case-insensitive)
well_known_name = WELL_KNOWN_FOLDER_MAP.get(folder_name.lower())
if well_known_name:
# Use well-known folder name directly to avoid querying uninitialized mailboxes
logger.debug(f"Using well-known folder name '{well_known_name}' for '{folder_name}'")
return well_known_name
return self._find_folder_id_with_parent(folder_name, None)
def _find_folder_id_with_parent(

180
tests.py
View File

@@ -3,6 +3,7 @@
from __future__ import absolute_import, print_function, unicode_literals
import json
import os
import unittest
from glob import glob
@@ -156,31 +157,168 @@ class Test(unittest.TestCase):
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
print("Passed!")
def testMSGraphWellKnownFolders(self):
"""Test MSGraph well-known folder name mapping"""
from parsedmarc.mail.graph import WELL_KNOWN_FOLDER_MAP
def testGoogleSecOpsAggregateReport(self):
"""Test Google SecOps aggregate report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
# Test English folder names
assert WELL_KNOWN_FOLDER_MAP.get("inbox") == "inbox"
assert WELL_KNOWN_FOLDER_MAP.get("sent items") == "sentitems"
assert WELL_KNOWN_FOLDER_MAP.get("deleted items") == "deleteditems"
assert WELL_KNOWN_FOLDER_MAP.get("archive") == "archive"
client = GoogleSecOpsClient(use_stdout=True)
sample_path = "samples/aggregate/example.net!example.com!1529366400!1529452799.xml"
print("Testing Google SecOps aggregate conversion for {0}: ".format(sample_path), end="")
# Test case insensitivity - simulating how the code actually uses it
# This is what happens when user config has "reports_folder = Inbox"
assert WELL_KNOWN_FOLDER_MAP.get("inbox") == "inbox"
assert WELL_KNOWN_FOLDER_MAP.get("Inbox".lower()) == "inbox" # User's exact config
assert WELL_KNOWN_FOLDER_MAP.get("INBOX".lower()) == "inbox"
assert WELL_KNOWN_FOLDER_MAP.get("Archive".lower()) == "archive"
parsed_file = parsedmarc.parse_report_file(sample_path, always_use_local_files=True)
parsed_report = parsed_file["report"]
# Test German folder names
assert WELL_KNOWN_FOLDER_MAP.get("posteingang") == "inbox"
assert WELL_KNOWN_FOLDER_MAP.get("Posteingang".lower()) == "inbox" # Capitalized
assert WELL_KNOWN_FOLDER_MAP.get("archiv") == "archive"
events = client.save_aggregate_report_to_google_secops(parsed_report)
# Test that custom folders don't match
assert WELL_KNOWN_FOLDER_MAP.get("custom_folder") is None
assert WELL_KNOWN_FOLDER_MAP.get("my_reports") is None
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "DMARC_AGGREGATE"
assert "metadata" in event_dict
assert "principal" in event_dict
assert "target" in event_dict
assert "security_result" in event_dict
print("Passed!")
def testGoogleSecOpsForensicReport(self):
"""Test Google SecOps forensic report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
# Test without payload
client = GoogleSecOpsClient(include_ruf_payload=False, use_stdout=True)
sample_path = "samples/forensic/dmarc_ruf_report_linkedin.eml"
print("Testing Google SecOps forensic conversion (no payload) for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path)
parsed_report = parsed_file["report"]
events = client.save_forensic_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "DMARC_FORENSIC"
# Verify no payload in additional fields
if "additional" in event_dict and "fields" in event_dict["additional"]:
for field in event_dict["additional"]["fields"]:
assert field["key"] != "message_sample", "Payload should not be included when disabled"
print("Passed!")
# Test with payload
client_with_payload = GoogleSecOpsClient(
include_ruf_payload=True,
ruf_payload_max_bytes=100,
use_stdout=True
)
print("Testing Google SecOps forensic conversion (with payload) for {0}: ".format(sample_path), end="")
events_with_payload = client_with_payload.save_forensic_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events_with_payload) > 0, "Expected at least one event"
# Verify payload is included
for event in events_with_payload:
event_dict = json.loads(event)
# Check if message_sample is in additional fields
has_sample = False
if "additional" in event_dict and "fields" in event_dict["additional"]:
for field in event_dict["additional"]["fields"]:
if field["key"] == "message_sample":
has_sample = True
# Verify truncation: max_bytes (100) + "... [truncated]" suffix (16 chars)
# Allow some margin for the actual payload length
max_expected_length = 100 + len("... [truncated]") + 10
assert len(field["value"]) <= max_expected_length, f"Payload should be truncated, got {len(field['value'])} bytes"
break
assert has_sample, "Payload should be included when enabled"
print("Passed!")
def testGoogleSecOpsConfiguration(self):
"""Test Google SecOps client configuration"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
print("Testing Google SecOps client configuration: ", end="")
# Test stdout configuration
client1 = GoogleSecOpsClient(use_stdout=True)
assert client1.include_ruf_payload is False
assert client1.ruf_payload_max_bytes == 4096
assert client1.static_observer_vendor == "parsedmarc"
assert client1.static_observer_name is None
assert client1.static_environment is None
assert client1.use_stdout is True
# Test custom configuration
client2 = GoogleSecOpsClient(
include_ruf_payload=True,
ruf_payload_max_bytes=8192,
static_observer_name="test-observer",
static_observer_vendor="test-vendor",
static_environment="prod",
use_stdout=True
)
assert client2.include_ruf_payload is True
assert client2.ruf_payload_max_bytes == 8192
assert client2.static_observer_name == "test-observer"
assert client2.static_observer_vendor == "test-vendor"
assert client2.static_environment == "prod"
print("Passed!")
def testGoogleSecOpsSmtpTlsReport(self):
"""Test Google SecOps SMTP TLS report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
sample_path = "samples/smtp_tls/rfc8460.json"
print("Testing Google SecOps SMTP TLS conversion for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path)
parsed_report = parsed_file["report"]
events = client.save_smtp_tls_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "SMTP_TLS_REPORT"
assert "metadata" in event_dict
assert "target" in event_dict
assert "security_result" in event_dict
# Verify failed_session_count is in detection_fields as an integer
found_count = False
for field in event_dict["security_result"][0]["detection_fields"]:
if field["key"] == "smtp_tls.failed_session_count":
assert isinstance(field["value"], int), "failed_session_count should be an integer"
found_count = True
break
assert found_count, "failed_session_count should be in detection_fields"
print("Passed!")
if __name__ == "__main__":