Compare commits

..

4 Commits

Author SHA1 Message Date
Blackmoon
221bc332ef Fixed a typo in policies.successful_session_count (#654) 2026-02-09 13:57:11 -05:00
Sean Whalen
a2a75f7a81 Fix timestamp parsing in aggregate report by removing fractional seconds 2026-01-21 13:08:48 -05:00
Anael Mobilia
50fcb51577 Update supported Python versions in docs + readme (#652)
* Update README.md

* Update index.md

* Update python-tests.yml
2026-01-19 14:40:01 -05:00
Sean Whalen
dd9ef90773 9.0.10
- Support Python 3.14+
2026-01-17 14:09:18 -05:00
12 changed files with 14 additions and 1477 deletions

View File

@@ -30,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- uses: actions/checkout@v5

View File

@@ -1,5 +1,9 @@
# Changelog
## 9.0.10
- Support Python 3.14+
## 9.0.9
### Fixes

View File

@@ -44,7 +44,6 @@ Thanks to all
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for
use with premade dashboards
- Optionally send reports to Apache Kafka
- Optionally send reports to Google SecOps (Chronicle) in UDM format via API or stdout
## Python Compatibility
@@ -62,4 +61,4 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
| 3.14 | | Actively maintained |

View File

@@ -1,494 +0,0 @@
# Google SecOps (Chronicle) Output
`parsedmarc` can output DMARC reports to Google SecOps (Chronicle) in UDM (Unified Data Model) format.
## Configuration
To enable Google SecOps output, add a `[google_secops]` section to your configuration file:
### Primary Method: Chronicle Ingestion API
The recommended approach is to send events directly to Chronicle via the Ingestion API:
```ini
[general]
save_aggregate = True
save_forensic = True
[google_secops]
# Required: Path to Google service account JSON credentials file
api_credentials_file = /path/to/service-account-credentials.json
# Required: Chronicle customer ID
api_customer_id = your-customer-id-here
# Optional: Chronicle region (default: us)
# Options: us, europe, asia-southeast1, me-central2, australia-southeast1
api_region = us
# Optional: Log type for Chronicle ingestion (default: DMARC)
api_log_type = DMARC
# Optional: Include forensic report message payload (default: False)
# For privacy, message bodies are excluded by default
include_ruf_payload = False
# Optional: Maximum bytes of forensic message payload to include (default: 4096)
ruf_payload_max_bytes = 4096
# Optional: Static observer name for telemetry identification
static_observer_name = my-parsedmarc-instance
# Optional: Static observer vendor (default: parsedmarc)
static_observer_vendor = parsedmarc
# Optional: Static environment label (e.g., prod, dev)
static_environment = prod
```
### Alternative Method: stdout Output
If you prefer to use an external log shipper (Fluentd, Logstash, Chronicle forwarder), set `use_stdout = True`:
```ini
[google_secops]
# Output to stdout instead of Chronicle API
use_stdout = True
# Other optional configuration options (as above)
include_ruf_payload = False
ruf_payload_max_bytes = 4096
static_observer_name = my-instance
static_observer_vendor = parsedmarc
static_environment = prod
```
## Output Format
The Google SecOps output produces newline-delimited JSON (NDJSON) in Chronicle UDM format, which can be ingested into Google SecOps for hunting and dashboarding.
### Event Types
1. **DMARC_AGGREGATE**: One event per aggregate report row, preserving count and period information
2. **DMARC_FORENSIC**: One event per forensic report
3. **SMTP_TLS_REPORT**: One event per SMTP TLS failure detail
4. **DMARC_PARSE_ERROR**: Generated when parsing fails (does not crash)
### UDM Schema
Each event includes:
- **metadata**: Event timestamp, type, product name, and vendor
- `event_timestamp`: RFC 3339 formatted timestamp
- `event_type`: Always "GENERIC_EVENT" for UDM
- `product_name`: Always "parsedmarc"
- `vendor_name`: Configurable via `static_observer_vendor` (default: "parsedmarc")
- `product_deployment_id` (optional): Set via `static_observer_name` config
- **principal**: Source IP address, location (country), and hostname (reverse DNS)
- `ip`: Array containing source IP address (always present)
- `location.country_or_region` (optional): ISO country code from IP geolocation
- `hostname` (optional): Reverse DNS hostname for source IP
- **target**: Domain name (from DMARC policy)
- `domain.name`: The domain being protected by DMARC
- **security_result**: Severity level, description, and detection fields for dashboarding
- `severity`: Derived severity level (HIGH/MEDIUM/LOW/ERROR)
- `description`: Human-readable event description
- **detection_fields**: Key DMARC dimensions for filtering and grouping
- All dashboard-relevant fields use `dmarc.*` or `smtp_tls.*` prefixes for easy identification
- Includes IP enrichment data (service name and type from reverse DNS mapping) for enhanced filtering
- See "Detection Fields" section below for complete field listings
- **additional.fields** (optional): Low-value context fields not typically used for dashboarding
- SPF/DKIM authentication details (e.g., `spf_0_domain`, `spf_0_result`)
- Forensic report metadata (e.g., `feedback_type`, `message_id`, `authentication_results`)
- Base domain, environment tags, optional message samples
**Design Rationale**: DMARC dimensions are placed in `security_result[].detection_fields` rather than `additional.fields` because Chronicle dashboards, stats searches, and aggregations work best with UDM label arrays. The `additional.fields` is a protobuf Struct intended for opaque context and is not reliably queryable for dashboard operations.
### Detection Fields
**Aggregate Report Fields** (`DMARC_AGGREGATE` events):
- `dmarc.disposition`: DMARC policy action (none, quarantine, reject)
- `dmarc.policy`: Published DMARC policy (none, quarantine, reject)
- `dmarc.pass`: Boolean - overall DMARC authentication result
- `dmarc.spf_aligned`: Boolean - SPF alignment status
- `dmarc.dkim_aligned`: Boolean - DKIM alignment status
- `dmarc.header_from`: Header From domain
- `dmarc.envelope_from`: Envelope From domain (MAIL FROM)
- `dmarc.report_org`: Reporting organization name
- `dmarc.report_id`: Unique report identifier
- `dmarc.report_begin`: Report period start timestamp
- `dmarc.report_end`: Report period end timestamp
- `dmarc.row_count`: Number of messages represented by this record
- `dmarc.spf_result` (optional): SPF authentication result (pass, fail, etc.)
- `dmarc.dkim_result` (optional): DKIM authentication result (pass, fail, etc.)
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
**Forensic Report Fields** (`DMARC_FORENSIC` events):
- `dmarc.auth_failure`: Authentication failure type(s) (dmarc, spf, dkim)
- `dmarc.reported_domain`: Domain that failed DMARC authentication
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
**SMTP TLS Report Fields** (`SMTP_TLS_REPORT` events):
- `smtp_tls.policy_domain`: Domain being monitored for TLS policy
- `smtp_tls.result_type`: Type of TLS failure (certificate-expired, validation-failure, etc.)
- `smtp_tls.failed_session_count`: Number of failed sessions
- `smtp_tls.report_org`: Reporting organization name
- `smtp_tls.report_begin`: Report period start timestamp
- `smtp_tls.report_end`: Report period end timestamp
### Severity Heuristics
- **HIGH**: DMARC disposition = reject
- **MEDIUM**: DMARC disposition = quarantine with partial SPF/DKIM failures
- **LOW**: DMARC disposition = none or pass
## Example Output
### Aggregate Report Event
```json
{
"event_type": "DMARC_AGGREGATE",
"metadata": {
"event_timestamp": "2018-06-19T00:00:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"principal": {
"ip": ["199.230.200.36"],
"location": {"country_or_region": "US"}
},
"target": {
"domain": {"name": "example.com"}
},
"security_result": [{
"severity": "LOW",
"description": "DMARC fail; SPF=pass; DKIM=pass; SPF not aligned; DKIM not aligned; disposition=none",
"detection_fields": [
{"key": "dmarc.disposition", "value": "none"},
{"key": "dmarc.policy", "value": "none"},
{"key": "dmarc.pass", "value": false},
{"key": "dmarc.spf_aligned", "value": false},
{"key": "dmarc.dkim_aligned", "value": false},
{"key": "dmarc.header_from", "value": "example.com"},
{"key": "dmarc.envelope_from", "value": "example.com"},
{"key": "dmarc.report_org", "value": "example.net"},
{"key": "dmarc.report_id", "value": "b043f0e264cf4ea995e93765242f6dfb"},
{"key": "dmarc.report_begin", "value": "2018-06-19 00:00:00"},
{"key": "dmarc.report_end", "value": "2018-06-19 23:59:59"},
{"key": "dmarc.row_count", "value": 1},
{"key": "dmarc.spf_result", "value": "pass"},
{"key": "dmarc.dkim_result", "value": "pass"},
{"key": "dmarc.source_service_name", "value": "Example Mail Service"},
{"key": "dmarc.source_service_type", "value": "Email Provider"}
]
}],
"additional": {
"fields": [
{"key": "spf_0_domain", "value": "example.com"},
{"key": "spf_0_result", "value": "pass"},
{"key": "dkim_0_domain", "value": "example.com"},
{"key": "dkim_0_result", "value": "pass"}
]
}
}
```
### Forensic Report Event
```json
{
"event_type": "DMARC_FORENSIC",
"metadata": {
"event_timestamp": "2019-04-30T02:09:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"principal": {
"ip": ["10.10.10.10"],
"location": {"country_or_region": "US"},
"hostname": "mail.example-sender.com"
},
"target": {
"domain": {"name": "example.com"}
},
"security_result": [{
"severity": "MEDIUM",
"description": "DMARC forensic report: authentication failure (dmarc)",
"detection_fields": [
{"key": "dmarc.auth_failure", "value": "dmarc"},
{"key": "dmarc.reported_domain", "value": "example.com"},
{"key": "dmarc.source_service_name", "value": "Example Mail Provider"},
{"key": "dmarc.source_service_type", "value": "Email Provider"}
]
}],
"additional": {
"fields": [
{"key": "feedback_type", "value": "auth-failure"},
{"key": "message_id", "value": "<01010101010101010101010101010101@ABAB01MS0016.someserver.loc>"},
{"key": "authentication_results", "value": "dmarc=fail (p=none; dis=none) header.from=example.com"},
{"key": "delivery_result", "value": "delivered"}
]
}
}
```
### SMTP TLS Report Event
```json
{
"event_type": "SMTP_TLS_REPORT",
"metadata": {
"event_timestamp": "2016-04-01T00:00:00+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"target": {
"domain": {
"name": "company-y.example"
}
},
"security_result": [{
"severity": "LOW",
"description": "SMTP TLS failure: certificate-expired",
"detection_fields": [
{"key": "smtp_tls.policy_domain", "value": "company-y.example"},
{"key": "smtp_tls.result_type", "value": "certificate-expired"},
{"key": "smtp_tls.failed_session_count", "value": 100},
{"key": "smtp_tls.report_org", "value": "Company-X"},
{"key": "smtp_tls.report_begin", "value": "2016-04-01T00:00:00Z"},
{"key": "smtp_tls.report_end", "value": "2016-04-01T23:59:59Z"}
]
}],
"principal": {
"ip": ["2001:db8:abcd:0012::1"]
}
}
```
### Parse Error Event
```json
{
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": "2026-01-09T16:22:10.933751+00:00",
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": "parsedmarc"
},
"security_result": [{
"severity": "ERROR",
"description": "Failed to parse DMARC report: Invalid XML structure"
}]
}
```
## Google SecOps Searches
Here are some example YARA-L rules you can use in Google SecOps to hunt for DMARC issues:
### Find all DMARC aggregate report failures
```yara-l
rule dmarc_aggregate_failures {
meta:
author = "parsedmarc"
description = "Detect DMARC authentication failures in aggregate reports"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
condition:
$e
}
```
### Find high severity DMARC events (rejected mail)
```yara-l
rule high_severity_dmarc_events {
meta:
author = "parsedmarc"
description = "Detect high severity DMARC aggregate events (rejected mail)"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.severity = "HIGH"
condition:
$e
}
```
### Find repeated DMARC failures from same source IP
```yara-l
rule repeated_dmarc_failures {
meta:
author = "parsedmarc"
description = "Detect repeated DMARC failures from the same source IP"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
$e.principal.ip = $source_ip
match:
$source_ip over 1h
condition:
#e > 5
}
```
### Find DMARC forensic reports with authentication failures
```yara-l
rule dmarc_forensic_failures {
meta:
author = "parsedmarc"
description = "Detect DMARC forensic reports with authentication failures"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_FORENSIC"
$e.security_result.detection_fields.key = "dmarc.auth_failure"
condition:
$e
}
```
### Find DMARC failures from specific mail service types
```yara-l
rule dmarc_failures_by_service_type {
meta:
author = "parsedmarc"
description = "Detect DMARC failures from specific mail service types"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "DMARC_AGGREGATE"
$e.security_result.detection_fields.key = "dmarc.pass"
$e.security_result.detection_fields.value = false
$e.security_result.detection_fields.key = "dmarc.source_service_type"
$e.security_result.detection_fields.value = "Email Provider"
condition:
$e
}
```
### Find SMTP TLS failures
```yara-l
rule smtp_tls_failures {
meta:
author = "parsedmarc"
description = "Detect SMTP TLS failures"
events:
$e.metadata.product_name = "parsedmarc"
$e.event_type = "SMTP_TLS_REPORT"
condition:
$e
}
```
## Privacy Considerations
By default, forensic report message bodies are **excluded** from the output to protect privacy. If you need to include message samples for investigation:
1. Set `include_ruf_payload = True` in your configuration
2. Adjust `ruf_payload_max_bytes` to limit the amount of data included (default: 4096 bytes)
3. Message samples will be truncated if they exceed the configured maximum
**Note**: Be aware of data privacy regulations (GDPR, CCPA, etc.) when including message payloads in security telemetry.
## Usage
The Google SecOps output works with all parsedmarc input methods, including file processing and mailbox monitoring.
### Primary Method: Direct API Ingestion
With Chronicle Ingestion API configured, events are sent directly to Chronicle:
```bash
# Process files - events are sent to Chronicle API automatically
parsedmarc -c config.ini samples/aggregate/*.xml
# Monitor mailbox - events are sent to Chronicle API in real-time
parsedmarc -c config.ini
```
No additional log shippers or pipelines are needed. The Google SecOps client handles authentication and batching automatically.
### Alternative Method: stdout Output with Log Shipper
If using `use_stdout = True` in your configuration, output DMARC reports to an external log shipper:
#### Processing Files
```bash
# Output to stdout
parsedmarc -c config.ini samples/aggregate/*.xml > dmarc_events.ndjson
# Stream to file
parsedmarc -c config.ini samples/aggregate/*.xml >> /var/log/dmarc/events.ndjson
# Pipe to log shipper (e.g., Fluentd, Logstash, Chronicle forwarder)
parsedmarc -c config.ini samples/aggregate/*.xml | fluentd
```
#### Monitoring Mailboxes
The Google SecOps output automatically works when monitoring mailboxes via IMAP, Microsoft Graph, or Gmail API. Configure your mailbox connection and enable watching:
```ini
[general]
save_aggregate = True
save_forensic = True
[mailbox]
watch = True
delete = False
batch_size = 10
[imap]
host = imap.example.com
user = dmarc@example.com
password = yourpassword
[google_secops]
# Use stdout mode for log shipper integration
use_stdout = True
include_ruf_payload = False
static_observer_name = mailbox-monitor
static_environment = prod
```
When watching a mailbox with stdout mode, parsedmarc continuously outputs UDM events as new reports arrive:
```bash
parsedmarc -c config.ini | fluentd
```
The output is in newline-delimited JSON format, with one UDM event per line, ready for collection by your log shipper.

View File

@@ -44,7 +44,6 @@ and Valimail.
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for use
with premade dashboards
- Optionally send reports to Apache Kafka
- Optionally send reports to Google SecOps (Chronicle) in UDM format
## Python Compatibility
@@ -62,7 +61,7 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
| 3.14 | | Actively maintained |
```{toctree}
:caption: 'Contents'
@@ -75,7 +74,6 @@ elasticsearch
opensearch
kibana
splunk
google_secops
davmail
dmarc
contributing

File diff suppressed because one or more lines are too long

View File

@@ -751,8 +751,8 @@ def parse_aggregate_report_xml(
new_report_metadata["report_id"] = report_id
date_range = report["report_metadata"]["date_range"]
begin_ts = int(date_range["begin"])
end_ts = int(date_range["end"])
begin_ts = int(date_range["begin"].split(".")[0])
end_ts = int(date_range["end"].split(".")[0])
span_seconds = end_ts - begin_ts
normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600

View File

@@ -27,7 +27,6 @@ from parsedmarc import (
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
google_secops,
kafkaclient,
loganalytics,
opensearch,
@@ -285,14 +284,6 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_aggregate_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_aggregate_url:
indent_value = 2 if opts.prettify_json else None
@@ -378,14 +369,6 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_forensic_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_forensic_url:
indent_value = 2 if opts.prettify_json else None
@@ -471,14 +454,6 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
try:
if opts.google_secops:
events = google_secops_client.save_smtp_tls_report_to_google_secops(report)
for event in events:
print(event)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
try:
if opts.webhook_smtp_tls_url:
indent_value = 2 if opts.prettify_json else None
@@ -747,17 +722,6 @@ def _main():
gelf_host=None,
gelf_port=None,
gelf_mode=None,
google_secops=False,
google_secops_include_ruf_payload=False,
google_secops_ruf_payload_max_bytes=4096,
google_secops_static_observer_name=None,
google_secops_static_observer_vendor="parsedmarc",
google_secops_static_environment=None,
google_secops_api_credentials_file=None,
google_secops_api_customer_id=None,
google_secops_api_region="us",
google_secops_api_log_type="DMARC",
google_secops_use_stdout=False,
webhook_aggregate_url=None,
webhook_forensic_url=None,
webhook_smtp_tls_url=None,
@@ -1337,50 +1301,6 @@ def _main():
logger.critical("mode setting missing from the gelf config section")
exit(-1)
if "google_secops" in config.sections():
google_secops_config = config["google_secops"]
opts.google_secops = True
if "include_ruf_payload" in google_secops_config:
opts.google_secops_include_ruf_payload = bool(
google_secops_config.getboolean("include_ruf_payload")
)
if "ruf_payload_max_bytes" in google_secops_config:
opts.google_secops_ruf_payload_max_bytes = google_secops_config.getint(
"ruf_payload_max_bytes"
)
if "static_observer_name" in google_secops_config:
opts.google_secops_static_observer_name = google_secops_config[
"static_observer_name"
]
if "static_observer_vendor" in google_secops_config:
opts.google_secops_static_observer_vendor = google_secops_config[
"static_observer_vendor"
]
if "static_environment" in google_secops_config:
opts.google_secops_static_environment = google_secops_config[
"static_environment"
]
if "api_credentials_file" in google_secops_config:
opts.google_secops_api_credentials_file = google_secops_config[
"api_credentials_file"
]
if "api_customer_id" in google_secops_config:
opts.google_secops_api_customer_id = google_secops_config[
"api_customer_id"
]
if "api_region" in google_secops_config:
opts.google_secops_api_region = google_secops_config[
"api_region"
]
if "api_log_type" in google_secops_config:
opts.google_secops_api_log_type = google_secops_config[
"api_log_type"
]
if "use_stdout" in google_secops_config:
opts.google_secops_use_stdout = google_secops_config.getboolean(
"use_stdout"
)
if "webhook" in config.sections():
webhook_config = config["webhook"]
if "aggregate_url" in webhook_config:
@@ -1559,23 +1479,6 @@ def _main():
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
if opts.google_secops:
try:
google_secops_client = google_secops.GoogleSecOpsClient(
include_ruf_payload=opts.google_secops_include_ruf_payload,
ruf_payload_max_bytes=opts.google_secops_ruf_payload_max_bytes,
static_observer_name=opts.google_secops_static_observer_name,
static_observer_vendor=opts.google_secops_static_observer_vendor,
static_environment=opts.google_secops_static_environment,
api_credentials_file=opts.google_secops_api_credentials_file,
api_customer_id=opts.google_secops_api_customer_id,
api_region=opts.google_secops_api_region,
api_log_type=opts.google_secops_api_log_type,
use_stdout=opts.google_secops_use_stdout,
)
except Exception as error_:
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url

View File

@@ -1,3 +1,3 @@
__version__ = "9.0.9"
__version__ = "9.0.10"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -1,709 +0,0 @@
# -*- coding: utf-8 -*-
"""Google SecOps (Chronicle) output module for parsedmarc"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Optional
import requests
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
class GoogleSecOpsError(RuntimeError):
"""Raised when a Google SecOps API error occurs"""
class GoogleSecOpsClient:
"""A client for Google SecOps (Chronicle) UDM output"""
def __init__(
self,
include_ruf_payload: bool = False,
ruf_payload_max_bytes: int = 4096,
static_observer_name: Optional[str] = None,
static_observer_vendor: str = "parsedmarc",
static_environment: Optional[str] = None,
api_credentials_file: Optional[str] = None,
api_customer_id: Optional[str] = None,
api_region: str = "us",
api_log_type: str = "DMARC",
use_stdout: bool = False,
):
"""
Initializes the GoogleSecOpsClient
Args:
include_ruf_payload: Include RUF message payload in output
ruf_payload_max_bytes: Maximum bytes of RUF payload to include
static_observer_name: Static observer name for telemetry
static_observer_vendor: Static observer vendor (default: parsedmarc)
static_environment: Static environment (prod/dev/custom string)
api_credentials_file: Path to Google service account JSON credentials
api_customer_id: Chronicle customer ID (required for API)
api_region: Chronicle region (us, europe, asia-southeast1, etc.)
api_log_type: Log type for Chronicle ingestion (default: DMARC)
use_stdout: Output to stdout instead of API (default: False)
"""
self.include_ruf_payload = include_ruf_payload
self.ruf_payload_max_bytes = ruf_payload_max_bytes
self.static_observer_name = static_observer_name
self.static_observer_vendor = static_observer_vendor
self.static_environment = static_environment
self.use_stdout = use_stdout
# API configuration
self.api_credentials_file = api_credentials_file
self.api_customer_id = api_customer_id
self.api_region = api_region
self.api_log_type = api_log_type
self.credentials = None
self.session = None
# Initialize API client if not using stdout
if not self.use_stdout:
if not self.api_credentials_file or not self.api_customer_id:
raise GoogleSecOpsError(
"api_credentials_file and api_customer_id are required when not using stdout. "
"Set use_stdout=True to output to stdout instead."
)
self._initialize_api_client()
def _initialize_api_client(self):
"""Initialize the Chronicle API client with authentication"""
try:
logger.debug("Initializing Chronicle API client")
# Load service account credentials
self.credentials = service_account.Credentials.from_service_account_file(
self.api_credentials_file,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
# Create session with authentication
self.session = requests.Session()
self.session.headers.update({"User-Agent": USER_AGENT})
logger.info("Chronicle API client initialized successfully")
except Exception as e:
raise GoogleSecOpsError(f"Failed to initialize Chronicle API client: {e}")
def _get_api_endpoint(self) -> str:
"""Get the Chronicle Ingestion API endpoint based on region"""
return f"https://{self.api_region}-chronicle.googleapis.com/v1alpha/projects/{self.api_customer_id}/locations/{self.api_region}/instances/default/logTypes/{self.api_log_type}/logs:import"
def _send_events_to_api(self, events: list[str]) -> None:
"""
Send UDM events to Chronicle Ingestion API
Args:
events: List of NDJSON event strings
"""
if not events:
return
try:
# Refresh credentials if needed
if not self.credentials.valid:
self.credentials.refresh(Request())
# Prepare request
endpoint = self._get_api_endpoint()
headers = {
"Authorization": f"Bearer {self.credentials.token}",
"Content-Type": "application/json",
}
# Chronicle expects events in inline_source format
# Each event should be a separate log entry
payload = {
"inline_source": {
"logs": [json.loads(event) for event in events]
}
}
logger.debug(f"Sending {len(events)} events to Chronicle API")
response = self.session.post(
endpoint,
headers=headers,
json=payload,
timeout=60,
)
if response.status_code == 200:
logger.info(f"Successfully sent {len(events)} events to Chronicle")
else:
error_msg = f"Chronicle API error: {response.status_code} - {response.text}"
logger.error(error_msg)
raise GoogleSecOpsError(error_msg)
except GoogleSecOpsError:
raise
except Exception as e:
raise GoogleSecOpsError(f"Failed to send events to Chronicle API: {e}")
def _output_events(self, events: list[str]) -> None:
"""
Output events either to stdout or Chronicle API
Args:
events: List of NDJSON event strings
"""
if self.use_stdout:
# Output to stdout for collection by external log shippers
for event in events:
print(event)
else:
# Send directly to Chronicle API
self._send_events_to_api(events)
def _get_severity(self, disposition: str, spf_aligned: bool, dkim_aligned: bool) -> str:
"""
Derive severity from DMARC disposition and alignment
Args:
disposition: DMARC policy disposition
spf_aligned: Whether SPF is aligned
dkim_aligned: Whether DKIM is aligned
Returns:
Severity level: HIGH, MEDIUM, or LOW
"""
if disposition == "reject":
return "HIGH"
elif disposition == "quarantine" and not (spf_aligned or dkim_aligned):
return "MEDIUM"
else:
return "LOW"
def _get_description(
self,
dmarc_pass: bool,
spf_result: Optional[str],
dkim_result: Optional[str],
spf_aligned: bool,
dkim_aligned: bool,
disposition: str,
) -> str:
"""
Generate description for the event
Args:
dmarc_pass: Whether DMARC passed
spf_result: SPF result
dkim_result: DKIM result
spf_aligned: Whether SPF is aligned
dkim_aligned: Whether DKIM is aligned
disposition: DMARC disposition
Returns:
Human-readable description
"""
parts = []
if dmarc_pass:
parts.append("DMARC pass")
else:
parts.append("DMARC fail")
if spf_result:
parts.append(f"SPF={spf_result}")
if dkim_result:
parts.append(f"DKIM={dkim_result}")
if spf_aligned:
parts.append("SPF aligned")
elif spf_result:
parts.append("SPF not aligned")
if dkim_aligned:
parts.append("DKIM aligned")
elif dkim_result:
parts.append("DKIM not aligned")
parts.append(f"disposition={disposition}")
return "; ".join(parts)
def _format_timestamp(self, timestamp_str: str) -> str:
"""
Convert parsedmarc timestamp to RFC 3339 format
Args:
timestamp_str: Timestamp string from parsedmarc
Returns:
RFC 3339 formatted timestamp
"""
try:
dt = human_timestamp_to_datetime(timestamp_str)
# Ensure timezone-aware datetime
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
except Exception:
# Fallback to current time if parsing fails
return datetime.now(timezone.utc).isoformat()
def save_aggregate_report_to_google_secops(
self, aggregate_report: dict[str, Any]
) -> list[str]:
"""
Convert aggregate DMARC report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
aggregate_report: Aggregate report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting aggregate report to Google SecOps UDM format")
events = []
try:
report_metadata = aggregate_report["report_metadata"]
policy_published = aggregate_report["policy_published"]
for record in aggregate_report["records"]:
# Extract values
source_ip = record["source"]["ip_address"]
source_country = record["source"].get("country")
source_reverse_dns = record["source"].get("reverse_dns")
source_base_domain = record["source"].get("base_domain")
source_name = record["source"].get("name")
source_type = record["source"].get("type")
header_from = record["identifiers"]["header_from"]
envelope_from = record["identifiers"]["envelope_from"]
disposition = record["policy_evaluated"]["disposition"]
spf_aligned = record["alignment"]["spf"]
dkim_aligned = record["alignment"]["dkim"]
dmarc_pass = record["alignment"]["dmarc"]
count = record["count"]
interval_begin = record["interval_begin"]
# Get auth results
spf_results = record["auth_results"].get("spf", [])
dkim_results = record["auth_results"].get("dkim", [])
spf_result = spf_results[0]["result"] if spf_results else None
dkim_result = dkim_results[0]["result"] if dkim_results else None
# Build UDM event
event: dict[str, Any] = {
"event_type": "DMARC_AGGREGATE",
"metadata": {
"event_timestamp": self._format_timestamp(interval_begin),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"principal": {
"ip": [source_ip],
},
"target": {
"domain": {
"name": header_from,
}
},
"security_result": [
{
"severity": self._get_severity(
disposition, spf_aligned, dkim_aligned
),
"description": self._get_description(
dmarc_pass,
spf_result,
dkim_result,
spf_aligned,
dkim_aligned,
disposition,
),
"detection_fields": [
{"key": "dmarc.disposition", "value": disposition},
{"key": "dmarc.policy", "value": policy_published["p"]},
{"key": "dmarc.pass", "value": dmarc_pass},
{"key": "dmarc.spf_aligned", "value": spf_aligned},
{"key": "dmarc.dkim_aligned", "value": dkim_aligned},
{"key": "dmarc.header_from", "value": header_from},
{"key": "dmarc.envelope_from", "value": envelope_from},
{"key": "dmarc.report_org", "value": report_metadata["org_name"]},
{"key": "dmarc.report_id", "value": report_metadata["report_id"]},
{"key": "dmarc.report_begin", "value": report_metadata["begin_date"]},
{"key": "dmarc.report_end", "value": report_metadata["end_date"]},
{"key": "dmarc.row_count", "value": count},
],
}
],
}
# Add optional fields to detection_fields
if spf_result:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.spf_result", "value": spf_result}
)
if dkim_result:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.dkim_result", "value": dkim_result}
)
if source_name:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_name", "value": source_name}
)
if source_type:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_type", "value": source_type}
)
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if source_base_domain:
additional_context.append(
{"key": "source_base_domain", "value": source_base_domain}
)
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Add SPF auth results to context
if spf_results:
for idx, spf in enumerate(spf_results):
additional_context.append(
{"key": f"spf_{idx}_domain", "value": spf.get("domain", "")}
)
additional_context.append(
{"key": f"spf_{idx}_result", "value": spf.get("result", "")}
)
# Add DKIM auth results to context
if dkim_results:
for idx, dkim in enumerate(dkim_results):
additional_context.append(
{"key": f"dkim_{idx}_domain", "value": dkim.get("domain", "")}
)
additional_context.append(
{"key": f"dkim_{idx}_result", "value": dkim.get("result", "")}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if source_country:
event["principal"]["location"] = {"country_or_region": source_country}
if source_reverse_dns:
event["principal"]["hostname"] = source_reverse_dns
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting aggregate report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse DMARC aggregate report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []
def save_forensic_report_to_google_secops(
self, forensic_report: dict[str, Any]
) -> list[str]:
"""
Convert forensic DMARC report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
forensic_report: Forensic report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting forensic report to Google SecOps UDM format")
events = []
try:
source_ip = forensic_report["source"]["ip_address"]
source_country = forensic_report["source"].get("country")
source_reverse_dns = forensic_report["source"].get("reverse_dns")
source_base_domain = forensic_report["source"].get("base_domain")
source_name = forensic_report["source"].get("name")
source_type = forensic_report["source"].get("type")
reported_domain = forensic_report["reported_domain"]
arrival_date = forensic_report["arrival_date_utc"]
auth_failure = forensic_report.get("auth_failure", [])
# Determine severity - forensic reports indicate failures
# Default to MEDIUM for authentication failures
severity = "MEDIUM"
# Build description
auth_failure_str = ", ".join(auth_failure) if auth_failure else "unknown"
description = f"DMARC forensic report: authentication failure ({auth_failure_str})"
# Build UDM event
event: dict[str, Any] = {
"event_type": "DMARC_FORENSIC",
"metadata": {
"event_timestamp": self._format_timestamp(arrival_date),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"principal": {
"ip": [source_ip],
},
"target": {
"domain": {
"name": reported_domain,
}
},
"security_result": [
{
"severity": severity,
"description": description,
"detection_fields": [
{"key": "dmarc.auth_failure", "value": auth_failure_str},
{"key": "dmarc.reported_domain", "value": reported_domain},
],
}
],
}
# Add optional fields to detection_fields (matching aggregate report field names)
if source_name:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_name", "value": source_name}
)
if source_type:
event["security_result"][0]["detection_fields"].append(
{"key": "dmarc.source_service_type", "value": source_type}
)
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if source_base_domain:
additional_context.append(
{"key": "source_base_domain", "value": source_base_domain}
)
if forensic_report.get("feedback_type"):
additional_context.append(
{"key": "feedback_type", "value": forensic_report["feedback_type"]}
)
if forensic_report.get("message_id"):
additional_context.append(
{"key": "message_id", "value": forensic_report["message_id"]}
)
if forensic_report.get("authentication_results"):
additional_context.append(
{"key": "authentication_results", "value": forensic_report["authentication_results"]}
)
if forensic_report.get("delivery_result"):
additional_context.append(
{"key": "delivery_result", "value": forensic_report["delivery_result"]}
)
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Add payload excerpt if enabled
if self.include_ruf_payload and forensic_report.get("sample"):
sample = forensic_report["sample"]
if len(sample) > self.ruf_payload_max_bytes:
sample = sample[:self.ruf_payload_max_bytes] + "... [truncated]"
additional_context.append(
{"key": "message_sample", "value": sample}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if source_country:
event["principal"]["location"] = {"country_or_region": source_country}
if source_reverse_dns:
event["principal"]["hostname"] = source_reverse_dns
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting forensic report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse DMARC forensic report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []
def save_smtp_tls_report_to_google_secops(
self, smtp_tls_report: dict[str, Any]
) -> list[str]:
"""
Convert SMTP TLS report to Google SecOps UDM format and send to Chronicle
When use_stdout=False: Events are sent to Chronicle API, returns empty list
When use_stdout=True: Returns list of NDJSON event strings for stdout
Args:
smtp_tls_report: SMTP TLS report dictionary from parsedmarc
Returns:
List of NDJSON event strings (empty if sent to API)
"""
logger.debug("Converting SMTP TLS report to Google SecOps UDM format")
events = []
try:
organization_name = smtp_tls_report.get("organization_name", "")
begin_date = smtp_tls_report["begin_date"]
end_date = smtp_tls_report["end_date"]
for policy in smtp_tls_report.get("policies", []):
policy_domain = policy["policy_domain"]
for failure in policy.get("failure_details", []):
# Build UDM event for each failure
event: dict[str, Any] = {
"event_type": "SMTP_TLS_REPORT",
"metadata": {
"event_timestamp": self._format_timestamp(begin_date),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"target": {
"domain": {
"name": policy_domain,
}
},
"security_result": [
{
"severity": "LOW",
"description": f"SMTP TLS failure: {failure.get('result_type', 'unknown')}",
"detection_fields": [
{"key": "smtp_tls.policy_domain", "value": policy_domain},
{"key": "smtp_tls.result_type", "value": failure.get("result_type", "")},
{"key": "smtp_tls.failed_session_count", "value": failure.get("failed_session_count", 0)},
{"key": "smtp_tls.report_org", "value": organization_name},
{"key": "smtp_tls.report_begin", "value": begin_date},
{"key": "smtp_tls.report_end", "value": end_date},
],
}
],
}
# Add optional context fields (low-value, not for dashboarding)
additional_context = []
if self.static_environment:
additional_context.append(
{"key": "environment", "value": self.static_environment}
)
# Only add additional section if there's context to include
if additional_context:
event["additional"] = {"fields": additional_context}
# Add optional UDM fields
if failure.get("sending_mta_ip"):
event["principal"] = {"ip": [failure["sending_mta_ip"]]}
if self.static_observer_name:
event["metadata"]["product_deployment_id"] = self.static_observer_name
events.append(json.dumps(event, ensure_ascii=False))
except Exception as e:
logger.error(f"Error converting SMTP TLS report to Google SecOps format: {e}")
# Generate error event
error_event: dict[str, Any] = {
"event_type": "DMARC_PARSE_ERROR",
"metadata": {
"event_timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "GENERIC_EVENT",
"product_name": "parsedmarc",
"vendor_name": self.static_observer_vendor,
},
"security_result": [
{
"severity": "ERROR",
"description": f"Failed to parse SMTP TLS report: {str(e)}",
}
],
}
events.append(json.dumps(error_event, ensure_ascii=False))
# Output events (to stdout or API)
self._output_events(events)
# Return events only if using stdout (for CLI to print)
return events if self.use_stdout else []

View File

@@ -29,7 +29,7 @@ classifiers = [
"Operating System :: OS Independent",
"Programming Language :: Python :: 3"
]
requires-python = ">=3.9, <3.14"
requires-python = ">=3.9"
dependencies = [
"azure-identity>=1.8.0",
"azure-monitor-ingestion>=1.0.0",
@@ -48,7 +48,7 @@ dependencies = [
"imapclient>=2.1.0",
"kafka-python-ng>=2.2.2",
"lxml>=4.4.0",
"mailsuite>=1.11.1",
"mailsuite>=1.11.2",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0",

164
tests.py
View File

@@ -3,7 +3,6 @@
from __future__ import absolute_import, print_function, unicode_literals
import json
import os
import unittest
from glob import glob
@@ -157,169 +156,6 @@ class Test(unittest.TestCase):
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
print("Passed!")
def testGoogleSecOpsAggregateReport(self):
"""Test Google SecOps aggregate report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
sample_path = "samples/aggregate/example.net!example.com!1529366400!1529452799.xml"
print("Testing Google SecOps aggregate conversion for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path, always_use_local_files=True)
parsed_report = parsed_file["report"]
events = client.save_aggregate_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "DMARC_AGGREGATE"
assert "metadata" in event_dict
assert "principal" in event_dict
assert "target" in event_dict
assert "security_result" in event_dict
print("Passed!")
def testGoogleSecOpsForensicReport(self):
"""Test Google SecOps forensic report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
# Test without payload
client = GoogleSecOpsClient(include_ruf_payload=False, use_stdout=True)
sample_path = "samples/forensic/dmarc_ruf_report_linkedin.eml"
print("Testing Google SecOps forensic conversion (no payload) for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path)
parsed_report = parsed_file["report"]
events = client.save_forensic_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "DMARC_FORENSIC"
# Verify no payload in additional fields
if "additional" in event_dict and "fields" in event_dict["additional"]:
for field in event_dict["additional"]["fields"]:
assert field["key"] != "message_sample", "Payload should not be included when disabled"
print("Passed!")
# Test with payload
client_with_payload = GoogleSecOpsClient(
include_ruf_payload=True,
ruf_payload_max_bytes=100,
use_stdout=True
)
print("Testing Google SecOps forensic conversion (with payload) for {0}: ".format(sample_path), end="")
events_with_payload = client_with_payload.save_forensic_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events_with_payload) > 0, "Expected at least one event"
# Verify payload is included
for event in events_with_payload:
event_dict = json.loads(event)
# Check if message_sample is in additional fields
has_sample = False
if "additional" in event_dict and "fields" in event_dict["additional"]:
for field in event_dict["additional"]["fields"]:
if field["key"] == "message_sample":
has_sample = True
# Verify truncation: max_bytes (100) + "... [truncated]" suffix (16 chars)
# Allow some margin for the actual payload length
max_expected_length = 100 + len("... [truncated]") + 10
assert len(field["value"]) <= max_expected_length, f"Payload should be truncated, got {len(field['value'])} bytes"
break
assert has_sample, "Payload should be included when enabled"
print("Passed!")
def testGoogleSecOpsConfiguration(self):
"""Test Google SecOps client configuration"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
print("Testing Google SecOps client configuration: ", end="")
# Test stdout configuration
client1 = GoogleSecOpsClient(use_stdout=True)
assert client1.include_ruf_payload is False
assert client1.ruf_payload_max_bytes == 4096
assert client1.static_observer_vendor == "parsedmarc"
assert client1.static_observer_name is None
assert client1.static_environment is None
assert client1.use_stdout is True
# Test custom configuration
client2 = GoogleSecOpsClient(
include_ruf_payload=True,
ruf_payload_max_bytes=8192,
static_observer_name="test-observer",
static_observer_vendor="test-vendor",
static_environment="prod",
use_stdout=True
)
assert client2.include_ruf_payload is True
assert client2.ruf_payload_max_bytes == 8192
assert client2.static_observer_name == "test-observer"
assert client2.static_observer_vendor == "test-vendor"
assert client2.static_environment == "prod"
print("Passed!")
def testGoogleSecOpsSmtpTlsReport(self):
"""Test Google SecOps SMTP TLS report conversion"""
print()
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
sample_path = "samples/smtp_tls/rfc8460.json"
print("Testing Google SecOps SMTP TLS conversion for {0}: ".format(sample_path), end="")
parsed_file = parsedmarc.parse_report_file(sample_path)
parsed_report = parsed_file["report"]
events = client.save_smtp_tls_report_to_google_secops(parsed_report)
# Verify we got events
assert len(events) > 0, "Expected at least one event"
# Verify each event is valid JSON
for event in events:
event_dict = json.loads(event)
assert "event_type" in event_dict
assert event_dict["event_type"] == "SMTP_TLS_REPORT"
assert "metadata" in event_dict
assert "target" in event_dict
assert "security_result" in event_dict
# Verify failed_session_count is in detection_fields as an integer
found_count = False
for field in event_dict["security_result"][0]["detection_fields"]:
if field["key"] == "smtp_tls.failed_session_count":
assert isinstance(field["value"], int), "failed_session_count should be an integer"
found_count = True
break
assert found_count, "failed_session_count should be in detection_fields"
print("Passed!")
if __name__ == "__main__":
unittest.main(verbosity=2)