Align Google SecOps module with parsedmarc 10.x terminology (forensic → failure)

This commit is contained in:
copilot-swe-agent[bot]
2026-06-04 00:12:29 +00:00
committed by GitHub
parent dab2aaffda
commit 7aa29df4a7
4 changed files with 129 additions and 76 deletions
+30 -20
View File
@@ -13,7 +13,7 @@ The recommended approach is to send events directly to Chronicle via the Ingesti
```ini
[general]
save_aggregate = True
save_forensic = True
save_failure = True
[google_secops]
# Required: Path to Google service account JSON credentials file
@@ -29,12 +29,12 @@ api_region = us
# Optional: Log type for Chronicle ingestion (default: DMARC)
api_log_type = DMARC
# Optional: Include forensic report message payload (default: False)
# Optional: Include failure report message payload (default: False)
# For privacy, message bodies are excluded by default
include_ruf_payload = False
include_failure_payload = False
# Optional: Maximum bytes of forensic message payload to include (default: 4096)
ruf_payload_max_bytes = 4096
failure_payload_max_bytes = 4096
# Optional: Static observer name for telemetry identification
static_observer_name = my-parsedmarc-instance
@@ -56,13 +56,23 @@ If you prefer to use an external log shipper (Fluentd, Logstash, Chronicle forwa
use_stdout = True
# Other optional configuration options (as above)
include_ruf_payload = False
ruf_payload_max_bytes = 4096
include_failure_payload = False
failure_payload_max_bytes = 4096
static_observer_name = my-instance
static_observer_vendor = parsedmarc
static_environment = prod
```
### Backward Compatibility Note
**parsedmarc 10.0+** aligns with the RFC terminology change from "forensic" to "failure" reports. The configuration now uses:
- `save_failure` instead of `save_forensic` (in `[general]` section)
- `include_failure_payload` instead of `include_ruf_payload`
- `failure_payload_max_bytes` instead of `ruf_payload_max_bytes`
- Event type `DMARC_FAILURE` instead of `DMARC_FORENSIC`
For backward compatibility, the old parameter names (`include_ruf_payload`, `ruf_payload_max_bytes`) are still supported but will generate deprecation warnings. Please update your configuration to use the new names.
## Output Format
The Google SecOps output produces newline-delimited JSON (NDJSON) in Chronicle UDM format, which can be ingested into Google SecOps for hunting and dashboarding.
@@ -70,7 +80,7 @@ The Google SecOps output produces newline-delimited JSON (NDJSON) in Chronicle U
### Event Types
1. **DMARC_AGGREGATE**: One event per aggregate report row, preserving count and period information
2. **DMARC_FORENSIC**: One event per forensic report
2. **DMARC_FAILURE**: One event per failure report
3. **SMTP_TLS_REPORT**: One event per SMTP TLS failure detail
4. **DMARC_PARSE_ERROR**: Generated when parsing fails (does not crash)
@@ -128,7 +138,7 @@ Each event includes:
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
**Forensic Report Fields** (`DMARC_FORENSIC` events):
**Failure Report Fields** (`DMARC_FAILURE` events):
- `dmarc.auth_failure`: Authentication failure type(s) (dmarc, spf, dkim)
- `dmarc.reported_domain`: Domain that failed DMARC authentication
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
@@ -201,11 +211,11 @@ Each event includes:
}
```
### Forensic Report Event
### Failure Report Event
```json
{
"event_type": "DMARC_FORENSIC",
"event_type": "DMARC_FAILURE",
"metadata": {
"event_timestamp": "2019-04-30T02:09:00+00:00",
"event_type": "GENERIC_EVENT",
@@ -222,7 +232,7 @@ Each event includes:
},
"security_result": [{
"severity": "MEDIUM",
"description": "DMARC forensic report: authentication failure (dmarc)",
"description": "DMARC failure report: authentication failure (dmarc)",
"detection_fields": [
{"key": "dmarc.auth_failure", "value": "dmarc"},
{"key": "dmarc.reported_domain", "value": "example.com"},
@@ -357,17 +367,17 @@ rule repeated_dmarc_failures {
}
```
### Find DMARC forensic reports with authentication failures
### Find DMARC failure reports with authentication failures
```yara-l
rule dmarc_forensic_failures {
rule dmarc_failure_failures {
meta:
author = "parsedmarc"
description = "Detect DMARC forensic reports with authentication failures"
description = "Detect DMARC failure reports with authentication failures"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_FORENSIC"
$e.event_type = "DMARC_FAILURE"
$e.security_result.detection_fields.key = "dmarc.auth_failure"
condition:
@@ -415,10 +425,10 @@ rule smtp_tls_failures {
## Privacy Considerations
By default, forensic report message bodies are **excluded** from the output to protect privacy. If you need to include message samples for investigation:
By default, failure report message bodies are **excluded** from the output to protect privacy. If you need to include message samples for investigation:
1. Set `include_ruf_payload = True` in your configuration
2. Adjust `ruf_payload_max_bytes` to limit the amount of data included (default: 4096 bytes)
1. Set `include_failure_payload = True` in your configuration
2. Adjust `failure_payload_max_bytes` to limit the amount of data included (default: 4096 bytes)
3. Message samples will be truncated if they exceed the configured maximum
**Note**: Be aware of data privacy regulations (GDPR, CCPA, etc.) when including message payloads in security telemetry.
@@ -465,7 +475,7 @@ The Google SecOps output automatically works when monitoring mailboxes via IMAP,
```ini
[general]
save_aggregate = True
save_forensic = True
save_failure = True
[mailbox]
watch = True
@@ -480,7 +490,7 @@ password = yourpassword
[google_secops]
# Use stdout mode for log shipper integration
use_stdout = True
include_ruf_payload = False
include_failure_payload = False
static_observer_name = mailbox-monitor
static_environment = prod
```
+23 -9
View File
@@ -380,7 +380,7 @@ def _main():
try:
if opts.google_secops:
events = google_secops_client.save_forensic_report_to_google_secops(report)
events = google_secops_client.save_failure_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
@@ -748,8 +748,8 @@ def _main():
gelf_port=None,
gelf_mode=None,
google_secops=False,
google_secops_include_ruf_payload=False,
google_secops_ruf_payload_max_bytes=4096,
google_secops_include_failure_payload=False,
google_secops_failure_payload_max_bytes=4096,
google_secops_static_observer_name=None,
google_secops_static_observer_vendor="parsedmarc",
google_secops_static_environment=None,
@@ -1340,12 +1340,26 @@ def _main():
if "google_secops" in config.sections():
google_secops_config = config["google_secops"]
opts.google_secops = True
if "include_ruf_payload" in google_secops_config:
opts.google_secops_include_ruf_payload = bool(
# New parameter names (parsedmarc 10+)
if "include_failure_payload" in google_secops_config:
opts.google_secops_include_failure_payload = bool(
google_secops_config.getboolean("include_failure_payload")
)
# Backward compatibility: old parameter name
elif "include_ruf_payload" in google_secops_config:
logger.warning("include_ruf_payload is deprecated, use include_failure_payload instead")
opts.google_secops_include_failure_payload = bool(
google_secops_config.getboolean("include_ruf_payload")
)
if "ruf_payload_max_bytes" in google_secops_config:
opts.google_secops_ruf_payload_max_bytes = google_secops_config.getint(
# New parameter names (parsedmarc 10+)
if "failure_payload_max_bytes" in google_secops_config:
opts.google_secops_failure_payload_max_bytes = google_secops_config.getint(
"failure_payload_max_bytes"
)
# Backward compatibility: old parameter name
elif "ruf_payload_max_bytes" in google_secops_config:
logger.warning("ruf_payload_max_bytes is deprecated, use failure_payload_max_bytes instead")
opts.google_secops_failure_payload_max_bytes = google_secops_config.getint(
"ruf_payload_max_bytes"
)
if "static_observer_name" in google_secops_config:
@@ -1562,8 +1576,8 @@ def _main():
if opts.google_secops:
try:
google_secops_client = google_secops.GoogleSecOpsClient(
include_ruf_payload=opts.google_secops_include_ruf_payload,
ruf_payload_max_bytes=opts.google_secops_ruf_payload_max_bytes,
include_failure_payload=opts.google_secops_include_failure_payload,
failure_payload_max_bytes=opts.google_secops_failure_payload_max_bytes,
static_observer_name=opts.google_secops_static_observer_name,
static_observer_vendor=opts.google_secops_static_observer_vendor,
static_environment=opts.google_secops_static_environment,
+66 -37
View File
@@ -26,8 +26,8 @@ class GoogleSecOpsClient:
def __init__(
self,
include_ruf_payload: bool = False,
ruf_payload_max_bytes: int = 4096,
include_failure_payload: bool = False,
failure_payload_max_bytes: int = 4096,
static_observer_name: Optional[str] = None,
static_observer_vendor: str = "parsedmarc",
static_environment: Optional[str] = None,
@@ -36,13 +36,16 @@ class GoogleSecOpsClient:
api_region: str = "us",
api_log_type: str = "DMARC",
use_stdout: bool = False,
# Backward compatibility parameters (deprecated)
include_ruf_payload: Optional[bool] = None,
ruf_payload_max_bytes: Optional[int] = None,
):
"""
Initializes the GoogleSecOpsClient
Args:
include_ruf_payload: Include RUF message payload in output
ruf_payload_max_bytes: Maximum bytes of RUF payload to include
include_failure_payload: Include failure report message payload in output
failure_payload_max_bytes: Maximum bytes of failure report payload to include
static_observer_name: Static observer name for telemetry
static_observer_vendor: Static observer vendor (default: parsedmarc)
static_environment: Static environment (prod/dev/custom string)
@@ -51,9 +54,19 @@ class GoogleSecOpsClient:
api_region: Chronicle region (us, europe, asia-southeast1, etc.)
api_log_type: Log type for Chronicle ingestion (default: DMARC)
use_stdout: Output to stdout instead of API (default: False)
include_ruf_payload: (Deprecated) Use include_failure_payload instead
ruf_payload_max_bytes: (Deprecated) Use failure_payload_max_bytes instead
"""
self.include_ruf_payload = include_ruf_payload
self.ruf_payload_max_bytes = ruf_payload_max_bytes
# Handle backward compatibility
if include_ruf_payload is not None:
logger.warning("include_ruf_payload is deprecated, use include_failure_payload instead")
include_failure_payload = include_ruf_payload
if ruf_payload_max_bytes is not None:
logger.warning("ruf_payload_max_bytes is deprecated, use failure_payload_max_bytes instead")
failure_payload_max_bytes = ruf_payload_max_bytes
self.include_failure_payload = include_failure_payload
self.failure_payload_max_bytes = failure_payload_max_bytes
self.static_observer_name = static_observer_name
self.static_observer_vendor = static_observer_vendor
self.static_environment = static_environment
@@ -443,47 +456,47 @@ class GoogleSecOpsClient:
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []
def save_forensic_report_to_google_secops(
self, forensic_report: dict[str, Any]
def save_failure_report_to_google_secops(
self, failure_report: dict[str, Any]
) -> list[str]:
"""
Convert forensic DMARC report to Google SecOps UDM format and send to Chronicle
Convert failure DMARC report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
forensic_report: Forensic report dictionary from parsedmarc
failure_report: Failure report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting forensic report to Google SecOps UDM format")
logger.debug("Converting failure report to Google SecOps UDM format")
events = []
try:
source_ip = forensic_report["source"]["ip_address"]
source_country = forensic_report["source"].get("country")
source_reverse_dns = forensic_report["source"].get("reverse_dns")
source_base_domain = forensic_report["source"].get("base_domain")
source_name = forensic_report["source"].get("name")
source_type = forensic_report["source"].get("type")
source_ip = failure_report["source"]["ip_address"]
source_country = failure_report["source"].get("country")
source_reverse_dns = failure_report["source"].get("reverse_dns")
source_base_domain = failure_report["source"].get("base_domain")
source_name = failure_report["source"].get("name")
source_type = failure_report["source"].get("type")
reported_domain = forensic_report["reported_domain"]
arrival_date = forensic_report["arrival_date_utc"]
auth_failure = forensic_report.get("auth_failure", [])
reported_domain = failure_report["reported_domain"]
arrival_date = failure_report["arrival_date_utc"]
auth_failure = failure_report.get("auth_failure", [])
# Determine severity - forensic reports indicate failures
# Determine severity - failure reports indicate authentication failures
# Default to MEDIUM for authentication failures
severity = "MEDIUM"
# Build description
auth_failure_str = ", ".join(auth_failure) if auth_failure else "unknown"
description = f"DMARC forensic report: authentication failure ({auth_failure_str})"
description = f"DMARC failure report: authentication failure ({auth_failure_str})"
# Build UDM event
event: dict[str, Any] = {
"event_type": "DMARC_FORENSIC",
"event_type": "DMARC_FAILURE",
"metadata": {
"event_timestamp": self._format_timestamp(arrival_date),
"event_type": "GENERIC_EVENT",
@@ -528,24 +541,24 @@ class GoogleSecOpsClient:
{"key": "source_base_domain", "value": source_base_domain}
)
if forensic_report.get("feedback_type"):
if failure_report.get("feedback_type"):
additional_context.append(
{"key": "feedback_type", "value": forensic_report["feedback_type"]}
{"key": "feedback_type", "value": failure_report["feedback_type"]}
)
if forensic_report.get("message_id"):
if failure_report.get("message_id"):
additional_context.append(
{"key": "message_id", "value": forensic_report["message_id"]}
{"key": "message_id", "value": failure_report["message_id"]}
)
if forensic_report.get("authentication_results"):
if failure_report.get("authentication_results"):
additional_context.append(
{"key": "authentication_results", "value": forensic_report["authentication_results"]}
{"key": "authentication_results", "value": failure_report["authentication_results"]}
)
if forensic_report.get("delivery_result"):
if failure_report.get("delivery_result"):
additional_context.append(
{"key": "delivery_result", "value": forensic_report["delivery_result"]}
{"key": "delivery_result", "value": failure_report["delivery_result"]}
)
if self.static_environment:
@@ -554,10 +567,10 @@ class GoogleSecOpsClient:
)
# Add payload excerpt if enabled
if self.include_ruf_payload and forensic_report.get("sample"):
sample = forensic_report["sample"]
if len(sample) > self.ruf_payload_max_bytes:
sample = sample[:self.ruf_payload_max_bytes] + "... [truncated]"
if self.include_failure_payload and failure_report.get("sample"):
sample = failure_report["sample"]
if len(sample) > self.failure_payload_max_bytes:
sample = sample[:self.failure_payload_max_bytes] + "... [truncated]"
additional_context.append(
{"key": "message_sample", "value": sample}
)
@@ -579,7 +592,7 @@ class GoogleSecOpsClient:
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting forensic report to Google SecOps format: {e}")
logger.error(f"Error converting failure report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
@@ -592,7 +605,7 @@ class GoogleSecOpsClient:
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse DMARC forensic report: {str(e)}",
"description": f"Failed to parse DMARC failure report: {str(e)}",
}
],
}
@@ -707,3 +720,19 @@ class GoogleSecOpsClient:
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []
# Backward compatibility alias (deprecated)
def save_forensic_report_to_google_secops(
self, forensic_report: dict[str, Any]
) -> list[str]:
"""
Deprecated: Use save_failure_report_to_google_secops instead.
This method is maintained for backward compatibility and will be removed
in a future version. Please update your code to use the new naming.
"""
logger.warning(
"save_forensic_report_to_google_secops is deprecated, "
"use save_failure_report_to_google_secops instead"
)
return self.save_failure_report_to_google_secops(forensic_report)
+10 -10
View File
@@ -186,20 +186,20 @@ class Test(unittest.TestCase):
print("Passed!")
def testGoogleSecOpsForensicReport(self):
"""Test Google SecOps forensic report conversion"""
def testGoogleSecOpsFailureReport(self):
"""Test Google SecOps failure report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
# Test without payload
client = GoogleSecOpsClient(include_ruf_payload=False, use_stdout=True)
client = GoogleSecOpsClient(include_failure_payload=False, use_stdout=True)
sample_path = "samples/forensic/dmarc_ruf_report_linkedin.eml"
print("Testing Google SecOps forensic conversion (no payload) for {0}: ".format(sample_path), end="")
print("Testing Google SecOps failure conversion (no payload) for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path)
parsed_report = parsed_file["report"]
events = client.save_forensic_report_to_google_secops(parsed_report)
events = client.save_failure_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
@@ -208,7 +208,7 @@ class Test(unittest.TestCase):
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "DMARC_FORENSIC"
assert event_dict["event_type"] == "DMARC_FAILURE"
# Verify no payload in additional fields
if "additional" in event_dict and "fields" in event_dict["additional"]:
@@ -219,13 +219,13 @@ class Test(unittest.TestCase):
# Test with payload
client_with_payload = GoogleSecOpsClient(
include_ruf_payload=True,
ruf_payload_max_bytes=100,
include_failure_payload=True,
failure_payload_max_bytes=100,
use_stdout=True
)
print("Testing Google SecOps forensic conversion (with payload) for {0}: ".format(sample_path), end="")
print("Testing Google SecOps failure conversion (with payload) for {0}: ".format(sample_path), end="")
events_with_payload = client_with_payload.save_forensic_report_to_google_secops(parsed_report)
events_with_payload = client_with_payload.save_failure_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events_with_payload) > 0, "Expected at least one event"