mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-03-07 07:11:24 +00:00
Compare commits
16 Commits
9.1.2
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dab2aaffda | ||
|
|
19e8b498d0 | ||
|
|
91ae56c029 | ||
|
|
e0818a22f4 | ||
|
|
da43efa4bf | ||
|
|
cf916509ea | ||
|
|
6ad7233983 | ||
|
|
63f8334e27 | ||
|
|
1aa0147c33 | ||
|
|
e9b4170591 | ||
|
|
d3a314171f | ||
|
|
b7823253a4 | ||
|
|
1887460ab6 | ||
|
|
c84ddb4e89 | ||
|
|
78c863bd12 | ||
|
|
12b9b37026 |
2
.github/workflows/python-tests.yml
vendored
2
.github/workflows/python-tests.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
|
||||
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
64
AGENTS.md
64
AGENTS.md
@@ -1,64 +0,0 @@
|
||||
# AGENTS.md
|
||||
|
||||
This file provides guidance to AI agents when working with code in this repository.
|
||||
|
||||
## Project Overview
|
||||
|
||||
parsedmarc is a Python module and CLI utility for parsing DMARC aggregate (RUA), forensic (RUF), and SMTP TLS reports. It reads reports from IMAP, Microsoft Graph, Gmail API, Maildir, mbox files, or direct file paths, and outputs to JSON/CSV, Elasticsearch, OpenSearch, Splunk, Kafka, S3, Azure Log Analytics, syslog, or webhooks.
|
||||
|
||||
## Common Commands
|
||||
|
||||
```bash
|
||||
# Install with dev/build dependencies
|
||||
pip install .[build]
|
||||
|
||||
# Run all tests with coverage
|
||||
pytest --cov --cov-report=xml tests.py
|
||||
|
||||
# Run a single test
|
||||
pytest tests.py::Test::testAggregateSamples
|
||||
|
||||
# Lint and format
|
||||
ruff check .
|
||||
ruff format .
|
||||
|
||||
# Test CLI with sample reports
|
||||
parsedmarc --debug -c ci.ini samples/aggregate/*
|
||||
parsedmarc --debug -c ci.ini samples/forensic/*
|
||||
|
||||
# Build docs
|
||||
cd docs && make html
|
||||
|
||||
# Build distribution
|
||||
hatch build
|
||||
```
|
||||
|
||||
To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
|
||||
|
||||
## Architecture
|
||||
|
||||
**Data flow:** Input sources → CLI (`cli.py:_main`) → Parse (`__init__.py`) → Enrich (DNS/GeoIP via `utils.py`) → Output integrations
|
||||
|
||||
### Key modules
|
||||
|
||||
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
|
||||
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration
|
||||
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
|
||||
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
|
||||
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
|
||||
- `parsedmarc/{elastic,opensearch,splunk,kafkaclient,loganalytics,syslog,s3,webhook,gelf}.py` — Output integrations
|
||||
|
||||
### Report type system
|
||||
|
||||
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError` → `InvalidDMARCReport` → `InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
|
||||
|
||||
### Caching
|
||||
|
||||
IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour (via `ExpiringDict`).
|
||||
|
||||
## Code Style
|
||||
|
||||
- Ruff for formatting and linting (configured in `.vscode/settings.json`)
|
||||
- TypedDict for structured data, type hints throughout
|
||||
- Python ≥3.10 required
|
||||
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`
|
||||
28
CHANGELOG.md
28
CHANGELOG.md
@@ -1,33 +1,5 @@
|
||||
# Changelog
|
||||
|
||||
## 9.1.2
|
||||
|
||||
### Fixes
|
||||
|
||||
- Fix duplicate detection for normalized aggregate reports in Elasticsearch/OpenSearch (PR #666 fixes issue #665)
|
||||
|
||||
## 9.1.1
|
||||
|
||||
### Fixes
|
||||
|
||||
- Fix the use of Elasticsearch and OpenSearch API keys (PR #660 fixes issue #653)
|
||||
|
||||
### Changes
|
||||
|
||||
- Drop support for Python 3.9 (PR #661)
|
||||
|
||||
## 9.1.0
|
||||
|
||||
## Enhancements
|
||||
|
||||
- Add TCP and TLS support for syslog output. (#656)
|
||||
- Skip DNS lookups in GitHub Actions to prevent DNS timeouts during tests timeouts. (#657)
|
||||
- Remove microseconds from DMARC aggregate report time ranges before parsing them.
|
||||
|
||||
## 9.0.10
|
||||
|
||||
- Support Python 3.14+
|
||||
|
||||
## 9.0.9
|
||||
|
||||
### Fixes
|
||||
|
||||
@@ -44,6 +44,7 @@ Thanks to all
|
||||
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for
|
||||
use with premade dashboards
|
||||
- Optionally send reports to Apache Kafka
|
||||
- Optionally send reports to Google SecOps (Chronicle) in UDM format via API or stdout
|
||||
|
||||
## Python Compatibility
|
||||
|
||||
@@ -56,9 +57,9 @@ for RHEL or Debian.
|
||||
| 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies |
|
||||
| 3.7 | ❌ | End of Life (EOL) |
|
||||
| 3.8 | ❌ | End of Life (EOL) |
|
||||
| 3.9 | ❌ | Used in Debian 11 and RHEL 9, but not supported by project dependencies |
|
||||
| 3.9 | ✅ | Supported until August 2026 (Debian 11); May 2032 (RHEL 9) |
|
||||
| 3.10 | ✅ | Actively maintained |
|
||||
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
|
||||
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
|
||||
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
|
||||
| 3.14 | ✅ | Supported (requires `imapclient>=3.1.0`) |
|
||||
| 3.14 | ❌ | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
|
||||
|
||||
1
ci.ini
1
ci.ini
@@ -3,7 +3,6 @@ save_aggregate = True
|
||||
save_forensic = True
|
||||
save_smtp_tls = True
|
||||
debug = True
|
||||
offline = True
|
||||
|
||||
[elasticsearch]
|
||||
hosts = http://localhost:9200
|
||||
|
||||
494
docs/source/google_secops.md
Normal file
494
docs/source/google_secops.md
Normal file
@@ -0,0 +1,494 @@
|
||||
# Google SecOps (Chronicle) Output
|
||||
|
||||
`parsedmarc` can output DMARC reports to Google SecOps (Chronicle) in UDM (Unified Data Model) format.
|
||||
|
||||
## Configuration
|
||||
|
||||
To enable Google SecOps output, add a `[google_secops]` section to your configuration file:
|
||||
|
||||
### Primary Method: Chronicle Ingestion API
|
||||
|
||||
The recommended approach is to send events directly to Chronicle via the Ingestion API:
|
||||
|
||||
```ini
|
||||
[general]
|
||||
save_aggregate = True
|
||||
save_forensic = True
|
||||
|
||||
[google_secops]
|
||||
# Required: Path to Google service account JSON credentials file
|
||||
api_credentials_file = /path/to/service-account-credentials.json
|
||||
|
||||
# Required: Chronicle customer ID
|
||||
api_customer_id = your-customer-id-here
|
||||
|
||||
# Optional: Chronicle region (default: us)
|
||||
# Options: us, europe, asia-southeast1, me-central2, australia-southeast1
|
||||
api_region = us
|
||||
|
||||
# Optional: Log type for Chronicle ingestion (default: DMARC)
|
||||
api_log_type = DMARC
|
||||
|
||||
# Optional: Include forensic report message payload (default: False)
|
||||
# For privacy, message bodies are excluded by default
|
||||
include_ruf_payload = False
|
||||
|
||||
# Optional: Maximum bytes of forensic message payload to include (default: 4096)
|
||||
ruf_payload_max_bytes = 4096
|
||||
|
||||
# Optional: Static observer name for telemetry identification
|
||||
static_observer_name = my-parsedmarc-instance
|
||||
|
||||
# Optional: Static observer vendor (default: parsedmarc)
|
||||
static_observer_vendor = parsedmarc
|
||||
|
||||
# Optional: Static environment label (e.g., prod, dev)
|
||||
static_environment = prod
|
||||
```
|
||||
|
||||
### Alternative Method: stdout Output
|
||||
|
||||
If you prefer to use an external log shipper (Fluentd, Logstash, Chronicle forwarder), set `use_stdout = True`:
|
||||
|
||||
```ini
|
||||
[google_secops]
|
||||
# Output to stdout instead of Chronicle API
|
||||
use_stdout = True
|
||||
|
||||
# Other optional configuration options (as above)
|
||||
include_ruf_payload = False
|
||||
ruf_payload_max_bytes = 4096
|
||||
static_observer_name = my-instance
|
||||
static_observer_vendor = parsedmarc
|
||||
static_environment = prod
|
||||
```
|
||||
|
||||
## Output Format
|
||||
|
||||
The Google SecOps output produces newline-delimited JSON (NDJSON) in Chronicle UDM format, which can be ingested into Google SecOps for hunting and dashboarding.
|
||||
|
||||
### Event Types
|
||||
|
||||
1. **DMARC_AGGREGATE**: One event per aggregate report row, preserving count and period information
|
||||
2. **DMARC_FORENSIC**: One event per forensic report
|
||||
3. **SMTP_TLS_REPORT**: One event per SMTP TLS failure detail
|
||||
4. **DMARC_PARSE_ERROR**: Generated when parsing fails (does not crash)
|
||||
|
||||
### UDM Schema
|
||||
|
||||
Each event includes:
|
||||
|
||||
- **metadata**: Event timestamp, type, product name, and vendor
|
||||
- `event_timestamp`: RFC 3339 formatted timestamp
|
||||
- `event_type`: Always "GENERIC_EVENT" for UDM
|
||||
- `product_name`: Always "parsedmarc"
|
||||
- `vendor_name`: Configurable via `static_observer_vendor` (default: "parsedmarc")
|
||||
- `product_deployment_id` (optional): Set via `static_observer_name` config
|
||||
|
||||
- **principal**: Source IP address, location (country), and hostname (reverse DNS)
|
||||
- `ip`: Array containing source IP address (always present)
|
||||
- `location.country_or_region` (optional): ISO country code from IP geolocation
|
||||
- `hostname` (optional): Reverse DNS hostname for source IP
|
||||
|
||||
- **target**: Domain name (from DMARC policy)
|
||||
- `domain.name`: The domain being protected by DMARC
|
||||
|
||||
- **security_result**: Severity level, description, and detection fields for dashboarding
|
||||
- `severity`: Derived severity level (HIGH/MEDIUM/LOW/ERROR)
|
||||
- `description`: Human-readable event description
|
||||
- **detection_fields**: Key DMARC dimensions for filtering and grouping
|
||||
- All dashboard-relevant fields use `dmarc.*` or `smtp_tls.*` prefixes for easy identification
|
||||
- Includes IP enrichment data (service name and type from reverse DNS mapping) for enhanced filtering
|
||||
- See "Detection Fields" section below for complete field listings
|
||||
|
||||
- **additional.fields** (optional): Low-value context fields not typically used for dashboarding
|
||||
- SPF/DKIM authentication details (e.g., `spf_0_domain`, `spf_0_result`)
|
||||
- Forensic report metadata (e.g., `feedback_type`, `message_id`, `authentication_results`)
|
||||
- Base domain, environment tags, optional message samples
|
||||
|
||||
**Design Rationale**: DMARC dimensions are placed in `security_result[].detection_fields` rather than `additional.fields` because Chronicle dashboards, stats searches, and aggregations work best with UDM label arrays. The `additional.fields` is a protobuf Struct intended for opaque context and is not reliably queryable for dashboard operations.
|
||||
|
||||
### Detection Fields
|
||||
|
||||
**Aggregate Report Fields** (`DMARC_AGGREGATE` events):
|
||||
- `dmarc.disposition`: DMARC policy action (none, quarantine, reject)
|
||||
- `dmarc.policy`: Published DMARC policy (none, quarantine, reject)
|
||||
- `dmarc.pass`: Boolean - overall DMARC authentication result
|
||||
- `dmarc.spf_aligned`: Boolean - SPF alignment status
|
||||
- `dmarc.dkim_aligned`: Boolean - DKIM alignment status
|
||||
- `dmarc.header_from`: Header From domain
|
||||
- `dmarc.envelope_from`: Envelope From domain (MAIL FROM)
|
||||
- `dmarc.report_org`: Reporting organization name
|
||||
- `dmarc.report_id`: Unique report identifier
|
||||
- `dmarc.report_begin`: Report period start timestamp
|
||||
- `dmarc.report_end`: Report period end timestamp
|
||||
- `dmarc.row_count`: Number of messages represented by this record
|
||||
- `dmarc.spf_result` (optional): SPF authentication result (pass, fail, etc.)
|
||||
- `dmarc.dkim_result` (optional): DKIM authentication result (pass, fail, etc.)
|
||||
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
|
||||
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
|
||||
|
||||
**Forensic Report Fields** (`DMARC_FORENSIC` events):
|
||||
- `dmarc.auth_failure`: Authentication failure type(s) (dmarc, spf, dkim)
|
||||
- `dmarc.reported_domain`: Domain that failed DMARC authentication
|
||||
- `dmarc.source_service_name` (optional): Enriched service name from reverse DNS mapping
|
||||
- `dmarc.source_service_type` (optional): Enriched service type (e.g., "Email Provider", "Webmail", "Marketing")
|
||||
|
||||
**SMTP TLS Report Fields** (`SMTP_TLS_REPORT` events):
|
||||
- `smtp_tls.policy_domain`: Domain being monitored for TLS policy
|
||||
- `smtp_tls.result_type`: Type of TLS failure (certificate-expired, validation-failure, etc.)
|
||||
- `smtp_tls.failed_session_count`: Number of failed sessions
|
||||
- `smtp_tls.report_org`: Reporting organization name
|
||||
- `smtp_tls.report_begin`: Report period start timestamp
|
||||
- `smtp_tls.report_end`: Report period end timestamp
|
||||
|
||||
### Severity Heuristics
|
||||
|
||||
- **HIGH**: DMARC disposition = reject
|
||||
- **MEDIUM**: DMARC disposition = quarantine with partial SPF/DKIM failures
|
||||
- **LOW**: DMARC disposition = none or pass
|
||||
|
||||
## Example Output
|
||||
|
||||
### Aggregate Report Event
|
||||
|
||||
```json
|
||||
{
|
||||
"event_type": "DMARC_AGGREGATE",
|
||||
"metadata": {
|
||||
"event_timestamp": "2018-06-19T00:00:00+00:00",
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": "parsedmarc"
|
||||
},
|
||||
"principal": {
|
||||
"ip": ["199.230.200.36"],
|
||||
"location": {"country_or_region": "US"}
|
||||
},
|
||||
"target": {
|
||||
"domain": {"name": "example.com"}
|
||||
},
|
||||
"security_result": [{
|
||||
"severity": "LOW",
|
||||
"description": "DMARC fail; SPF=pass; DKIM=pass; SPF not aligned; DKIM not aligned; disposition=none",
|
||||
"detection_fields": [
|
||||
{"key": "dmarc.disposition", "value": "none"},
|
||||
{"key": "dmarc.policy", "value": "none"},
|
||||
{"key": "dmarc.pass", "value": false},
|
||||
{"key": "dmarc.spf_aligned", "value": false},
|
||||
{"key": "dmarc.dkim_aligned", "value": false},
|
||||
{"key": "dmarc.header_from", "value": "example.com"},
|
||||
{"key": "dmarc.envelope_from", "value": "example.com"},
|
||||
{"key": "dmarc.report_org", "value": "example.net"},
|
||||
{"key": "dmarc.report_id", "value": "b043f0e264cf4ea995e93765242f6dfb"},
|
||||
{"key": "dmarc.report_begin", "value": "2018-06-19 00:00:00"},
|
||||
{"key": "dmarc.report_end", "value": "2018-06-19 23:59:59"},
|
||||
{"key": "dmarc.row_count", "value": 1},
|
||||
{"key": "dmarc.spf_result", "value": "pass"},
|
||||
{"key": "dmarc.dkim_result", "value": "pass"},
|
||||
{"key": "dmarc.source_service_name", "value": "Example Mail Service"},
|
||||
{"key": "dmarc.source_service_type", "value": "Email Provider"}
|
||||
]
|
||||
}],
|
||||
"additional": {
|
||||
"fields": [
|
||||
{"key": "spf_0_domain", "value": "example.com"},
|
||||
{"key": "spf_0_result", "value": "pass"},
|
||||
{"key": "dkim_0_domain", "value": "example.com"},
|
||||
{"key": "dkim_0_result", "value": "pass"}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Forensic Report Event
|
||||
|
||||
```json
|
||||
{
|
||||
"event_type": "DMARC_FORENSIC",
|
||||
"metadata": {
|
||||
"event_timestamp": "2019-04-30T02:09:00+00:00",
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": "parsedmarc"
|
||||
},
|
||||
"principal": {
|
||||
"ip": ["10.10.10.10"],
|
||||
"location": {"country_or_region": "US"},
|
||||
"hostname": "mail.example-sender.com"
|
||||
},
|
||||
"target": {
|
||||
"domain": {"name": "example.com"}
|
||||
},
|
||||
"security_result": [{
|
||||
"severity": "MEDIUM",
|
||||
"description": "DMARC forensic report: authentication failure (dmarc)",
|
||||
"detection_fields": [
|
||||
{"key": "dmarc.auth_failure", "value": "dmarc"},
|
||||
{"key": "dmarc.reported_domain", "value": "example.com"},
|
||||
{"key": "dmarc.source_service_name", "value": "Example Mail Provider"},
|
||||
{"key": "dmarc.source_service_type", "value": "Email Provider"}
|
||||
]
|
||||
}],
|
||||
"additional": {
|
||||
"fields": [
|
||||
{"key": "feedback_type", "value": "auth-failure"},
|
||||
{"key": "message_id", "value": "<01010101010101010101010101010101@ABAB01MS0016.someserver.loc>"},
|
||||
{"key": "authentication_results", "value": "dmarc=fail (p=none; dis=none) header.from=example.com"},
|
||||
{"key": "delivery_result", "value": "delivered"}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### SMTP TLS Report Event
|
||||
|
||||
```json
|
||||
{
|
||||
"event_type": "SMTP_TLS_REPORT",
|
||||
"metadata": {
|
||||
"event_timestamp": "2016-04-01T00:00:00+00:00",
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": "parsedmarc"
|
||||
},
|
||||
"target": {
|
||||
"domain": {
|
||||
"name": "company-y.example"
|
||||
}
|
||||
},
|
||||
"security_result": [{
|
||||
"severity": "LOW",
|
||||
"description": "SMTP TLS failure: certificate-expired",
|
||||
"detection_fields": [
|
||||
{"key": "smtp_tls.policy_domain", "value": "company-y.example"},
|
||||
{"key": "smtp_tls.result_type", "value": "certificate-expired"},
|
||||
{"key": "smtp_tls.failed_session_count", "value": 100},
|
||||
{"key": "smtp_tls.report_org", "value": "Company-X"},
|
||||
{"key": "smtp_tls.report_begin", "value": "2016-04-01T00:00:00Z"},
|
||||
{"key": "smtp_tls.report_end", "value": "2016-04-01T23:59:59Z"}
|
||||
]
|
||||
}],
|
||||
"principal": {
|
||||
"ip": ["2001:db8:abcd:0012::1"]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Parse Error Event
|
||||
|
||||
```json
|
||||
{
|
||||
"event_type": "DMARC_PARSE_ERROR",
|
||||
"metadata": {
|
||||
"event_timestamp": "2026-01-09T16:22:10.933751+00:00",
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": "parsedmarc"
|
||||
},
|
||||
"security_result": [{
|
||||
"severity": "ERROR",
|
||||
"description": "Failed to parse DMARC report: Invalid XML structure"
|
||||
}]
|
||||
}
|
||||
```
|
||||
|
||||
## Google SecOps Searches
|
||||
|
||||
Here are some example YARA-L rules you can use in Google SecOps to hunt for DMARC issues:
|
||||
|
||||
### Find all DMARC aggregate report failures
|
||||
|
||||
```yara-l
|
||||
rule dmarc_aggregate_failures {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect DMARC authentication failures in aggregate reports"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_AGGREGATE"
|
||||
$e.security_result.detection_fields.key = "dmarc.pass"
|
||||
$e.security_result.detection_fields.value = false
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
### Find high severity DMARC events (rejected mail)
|
||||
|
||||
```yara-l
|
||||
rule high_severity_dmarc_events {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect high severity DMARC aggregate events (rejected mail)"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_AGGREGATE"
|
||||
$e.security_result.severity = "HIGH"
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
### Find repeated DMARC failures from same source IP
|
||||
|
||||
```yara-l
|
||||
rule repeated_dmarc_failures {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect repeated DMARC failures from the same source IP"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_AGGREGATE"
|
||||
$e.security_result.detection_fields.key = "dmarc.pass"
|
||||
$e.security_result.detection_fields.value = false
|
||||
$e.principal.ip = $source_ip
|
||||
|
||||
match:
|
||||
$source_ip over 1h
|
||||
|
||||
condition:
|
||||
#e > 5
|
||||
}
|
||||
```
|
||||
|
||||
### Find DMARC forensic reports with authentication failures
|
||||
|
||||
```yara-l
|
||||
rule dmarc_forensic_failures {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect DMARC forensic reports with authentication failures"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_FORENSIC"
|
||||
$e.security_result.detection_fields.key = "dmarc.auth_failure"
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
### Find DMARC failures from specific mail service types
|
||||
|
||||
```yara-l
|
||||
rule dmarc_failures_by_service_type {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect DMARC failures from specific mail service types"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "DMARC_AGGREGATE"
|
||||
$e.security_result.detection_fields.key = "dmarc.pass"
|
||||
$e.security_result.detection_fields.value = false
|
||||
$e.security_result.detection_fields.key = "dmarc.source_service_type"
|
||||
$e.security_result.detection_fields.value = "Email Provider"
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
### Find SMTP TLS failures
|
||||
|
||||
```yara-l
|
||||
rule smtp_tls_failures {
|
||||
meta:
|
||||
author = "parsedmarc"
|
||||
description = "Detect SMTP TLS failures"
|
||||
|
||||
events:
|
||||
$e.metadata.product_name = "parsedmarc"
|
||||
$e.event_type = "SMTP_TLS_REPORT"
|
||||
|
||||
condition:
|
||||
$e
|
||||
}
|
||||
```
|
||||
|
||||
## Privacy Considerations
|
||||
|
||||
By default, forensic report message bodies are **excluded** from the output to protect privacy. If you need to include message samples for investigation:
|
||||
|
||||
1. Set `include_ruf_payload = True` in your configuration
|
||||
2. Adjust `ruf_payload_max_bytes` to limit the amount of data included (default: 4096 bytes)
|
||||
3. Message samples will be truncated if they exceed the configured maximum
|
||||
|
||||
**Note**: Be aware of data privacy regulations (GDPR, CCPA, etc.) when including message payloads in security telemetry.
|
||||
|
||||
## Usage
|
||||
|
||||
The Google SecOps output works with all parsedmarc input methods, including file processing and mailbox monitoring.
|
||||
|
||||
### Primary Method: Direct API Ingestion
|
||||
|
||||
With Chronicle Ingestion API configured, events are sent directly to Chronicle:
|
||||
|
||||
```bash
|
||||
# Process files - events are sent to Chronicle API automatically
|
||||
parsedmarc -c config.ini samples/aggregate/*.xml
|
||||
|
||||
# Monitor mailbox - events are sent to Chronicle API in real-time
|
||||
parsedmarc -c config.ini
|
||||
```
|
||||
|
||||
No additional log shippers or pipelines are needed. The Google SecOps client handles authentication and batching automatically.
|
||||
|
||||
### Alternative Method: stdout Output with Log Shipper
|
||||
|
||||
If using `use_stdout = True` in your configuration, output DMARC reports to an external log shipper:
|
||||
|
||||
#### Processing Files
|
||||
|
||||
```bash
|
||||
# Output to stdout
|
||||
parsedmarc -c config.ini samples/aggregate/*.xml > dmarc_events.ndjson
|
||||
|
||||
# Stream to file
|
||||
parsedmarc -c config.ini samples/aggregate/*.xml >> /var/log/dmarc/events.ndjson
|
||||
|
||||
# Pipe to log shipper (e.g., Fluentd, Logstash, Chronicle forwarder)
|
||||
parsedmarc -c config.ini samples/aggregate/*.xml | fluentd
|
||||
```
|
||||
|
||||
#### Monitoring Mailboxes
|
||||
|
||||
The Google SecOps output automatically works when monitoring mailboxes via IMAP, Microsoft Graph, or Gmail API. Configure your mailbox connection and enable watching:
|
||||
|
||||
```ini
|
||||
[general]
|
||||
save_aggregate = True
|
||||
save_forensic = True
|
||||
|
||||
[mailbox]
|
||||
watch = True
|
||||
delete = False
|
||||
batch_size = 10
|
||||
|
||||
[imap]
|
||||
host = imap.example.com
|
||||
user = dmarc@example.com
|
||||
password = yourpassword
|
||||
|
||||
[google_secops]
|
||||
# Use stdout mode for log shipper integration
|
||||
use_stdout = True
|
||||
include_ruf_payload = False
|
||||
static_observer_name = mailbox-monitor
|
||||
static_environment = prod
|
||||
```
|
||||
|
||||
When watching a mailbox with stdout mode, parsedmarc continuously outputs UDM events as new reports arrive:
|
||||
|
||||
```bash
|
||||
parsedmarc -c config.ini | fluentd
|
||||
```
|
||||
|
||||
The output is in newline-delimited JSON format, with one UDM event per line, ready for collection by your log shipper.
|
||||
@@ -44,6 +44,7 @@ and Valimail.
|
||||
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for use
|
||||
with premade dashboards
|
||||
- Optionally send reports to Apache Kafka
|
||||
- Optionally send reports to Google SecOps (Chronicle) in UDM format
|
||||
|
||||
## Python Compatibility
|
||||
|
||||
@@ -56,12 +57,12 @@ for RHEL or Debian.
|
||||
| 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies |
|
||||
| 3.7 | ❌ | End of Life (EOL) |
|
||||
| 3.8 | ❌ | End of Life (EOL) |
|
||||
| 3.9 | ❌ | Used in Debian 11 and RHEL 9, but not supported by project dependencies |
|
||||
| 3.9 | ✅ | Supported until August 2026 (Debian 11); May 2032 (RHEL 9) |
|
||||
| 3.10 | ✅ | Actively maintained |
|
||||
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
|
||||
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
|
||||
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
|
||||
| 3.14 | ✅ | Supported (requires `imapclient>=3.1.0`) |
|
||||
| 3.14 | ❌ | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
|
||||
|
||||
```{toctree}
|
||||
:caption: 'Contents'
|
||||
@@ -74,6 +75,7 @@ elasticsearch
|
||||
opensearch
|
||||
kibana
|
||||
splunk
|
||||
google_secops
|
||||
davmail
|
||||
dmarc
|
||||
contributing
|
||||
|
||||
@@ -162,10 +162,10 @@ sudo -u parsedmarc virtualenv /opt/parsedmarc/venv
|
||||
```
|
||||
|
||||
CentOS/RHEL 8 systems use Python 3.6 by default, so on those systems
|
||||
explicitly tell `virtualenv` to use `python3.10` instead
|
||||
explicitly tell `virtualenv` to use `python3.9` instead
|
||||
|
||||
```bash
|
||||
sudo -u parsedmarc virtualenv -p python3.10 /opt/parsedmarc/venv
|
||||
sudo -u parsedmarc virtualenv -p python3.9 /opt/parsedmarc/venv
|
||||
```
|
||||
|
||||
Activate the virtualenv
|
||||
|
||||
@@ -171,8 +171,8 @@ The full set of configuration options are:
|
||||
- `check_timeout` - int: Number of seconds to wait for a IMAP
|
||||
IDLE response or the number of seconds until the next
|
||||
mail check (Default: `30`)
|
||||
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
|
||||
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
|
||||
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
|
||||
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
|
||||
Defaults to `1d` if incorrect value is provided.
|
||||
- `imap`
|
||||
- `host` - str: The IMAP server hostname or IP address
|
||||
@@ -240,7 +240,7 @@ The full set of configuration options are:
|
||||
group and use that as the group id.
|
||||
|
||||
```powershell
|
||||
New-ApplicationAccessPolicy -AccessRight RestrictAccess
|
||||
New-ApplicationAccessPolicy -AccessRight RestrictAccess
|
||||
-AppId "<CLIENT_ID>" -PolicyScopeGroupId "<MAILBOX>"
|
||||
-Description "Restrict access to dmarc reports mailbox."
|
||||
```
|
||||
@@ -336,65 +336,13 @@ The full set of configuration options are:
|
||||
- `secret_access_key` - str: The secret access key (Optional)
|
||||
- `syslog`
|
||||
- `server` - str: The Syslog server name or IP address
|
||||
- `port` - int: The port to use (Default: `514`)
|
||||
- `protocol` - str: The protocol to use: `udp`, `tcp`, or `tls` (Default: `udp`)
|
||||
- `cafile_path` - str: Path to CA certificate file for TLS server verification (Optional)
|
||||
- `certfile_path` - str: Path to client certificate file for TLS authentication (Optional)
|
||||
- `keyfile_path` - str: Path to client private key file for TLS authentication (Optional)
|
||||
- `timeout` - float: Connection timeout in seconds for TCP/TLS (Default: `5.0`)
|
||||
- `retry_attempts` - int: Number of retry attempts for failed connections (Default: `3`)
|
||||
- `retry_delay` - int: Delay in seconds between retry attempts (Default: `5`)
|
||||
|
||||
**Example UDP configuration (default):**
|
||||
|
||||
```ini
|
||||
[syslog]
|
||||
server = syslog.example.com
|
||||
port = 514
|
||||
```
|
||||
|
||||
**Example TCP configuration:**
|
||||
|
||||
```ini
|
||||
[syslog]
|
||||
server = syslog.example.com
|
||||
port = 6514
|
||||
protocol = tcp
|
||||
timeout = 10.0
|
||||
retry_attempts = 5
|
||||
```
|
||||
|
||||
**Example TLS configuration with server verification:**
|
||||
|
||||
```ini
|
||||
[syslog]
|
||||
server = syslog.example.com
|
||||
port = 6514
|
||||
protocol = tls
|
||||
cafile_path = /path/to/ca-cert.pem
|
||||
timeout = 10.0
|
||||
```
|
||||
|
||||
**Example TLS configuration with mutual authentication:**
|
||||
|
||||
```ini
|
||||
[syslog]
|
||||
server = syslog.example.com
|
||||
port = 6514
|
||||
protocol = tls
|
||||
cafile_path = /path/to/ca-cert.pem
|
||||
certfile_path = /path/to/client-cert.pem
|
||||
keyfile_path = /path/to/client-key.pem
|
||||
timeout = 10.0
|
||||
retry_attempts = 3
|
||||
retry_delay = 5
|
||||
```
|
||||
- `port` - int: The UDP port to use (Default: `514`)
|
||||
- `gmail_api`
|
||||
- `credentials_file` - str: Path to file containing the
|
||||
credentials, None to disable (Default: `None`)
|
||||
- `token_file` - str: Path to save the token file
|
||||
(Default: `.token`)
|
||||
|
||||
|
||||
:::{note}
|
||||
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
|
||||
:::
|
||||
@@ -494,7 +442,7 @@ Update the limit to 2k per example:
|
||||
PUT _cluster/settings
|
||||
{
|
||||
"persistent" : {
|
||||
"cluster.max_shards_per_node" : 2000
|
||||
"cluster.max_shards_per_node" : 2000
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -751,8 +751,8 @@ def parse_aggregate_report_xml(
|
||||
new_report_metadata["report_id"] = report_id
|
||||
date_range = report["report_metadata"]["date_range"]
|
||||
|
||||
begin_ts = int(date_range["begin"].split(".")[0])
|
||||
end_ts = int(date_range["end"].split(".")[0])
|
||||
begin_ts = int(date_range["begin"])
|
||||
end_ts = int(date_range["end"])
|
||||
span_seconds = end_ts - begin_ts
|
||||
|
||||
normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600
|
||||
|
||||
@@ -27,6 +27,7 @@ from parsedmarc import (
|
||||
gelf,
|
||||
get_dmarc_reports_from_mailbox,
|
||||
get_dmarc_reports_from_mbox,
|
||||
google_secops,
|
||||
kafkaclient,
|
||||
loganalytics,
|
||||
opensearch,
|
||||
@@ -284,6 +285,14 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("GELF Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.google_secops:
|
||||
events = google_secops_client.save_aggregate_report_to_google_secops(report)
|
||||
for event in events:
|
||||
print(event)
|
||||
except Exception as error_:
|
||||
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.webhook_aggregate_url:
|
||||
indent_value = 2 if opts.prettify_json else None
|
||||
@@ -369,6 +378,14 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("GELF Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.google_secops:
|
||||
events = google_secops_client.save_forensic_report_to_google_secops(report)
|
||||
for event in events:
|
||||
print(event)
|
||||
except Exception as error_:
|
||||
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.webhook_forensic_url:
|
||||
indent_value = 2 if opts.prettify_json else None
|
||||
@@ -454,6 +471,14 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("GELF Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.google_secops:
|
||||
events = google_secops_client.save_smtp_tls_report_to_google_secops(report)
|
||||
for event in events:
|
||||
print(event)
|
||||
except Exception as error_:
|
||||
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.webhook_smtp_tls_url:
|
||||
indent_value = 2 if opts.prettify_json else None
|
||||
@@ -697,13 +722,6 @@ def _main():
|
||||
s3_secret_access_key=None,
|
||||
syslog_server=None,
|
||||
syslog_port=None,
|
||||
syslog_protocol=None,
|
||||
syslog_cafile_path=None,
|
||||
syslog_certfile_path=None,
|
||||
syslog_keyfile_path=None,
|
||||
syslog_timeout=None,
|
||||
syslog_retry_attempts=None,
|
||||
syslog_retry_delay=None,
|
||||
gmail_api_credentials_file=None,
|
||||
gmail_api_token_file=None,
|
||||
gmail_api_include_spam_trash=False,
|
||||
@@ -729,6 +747,17 @@ def _main():
|
||||
gelf_host=None,
|
||||
gelf_port=None,
|
||||
gelf_mode=None,
|
||||
google_secops=False,
|
||||
google_secops_include_ruf_payload=False,
|
||||
google_secops_ruf_payload_max_bytes=4096,
|
||||
google_secops_static_observer_name=None,
|
||||
google_secops_static_observer_vendor="parsedmarc",
|
||||
google_secops_static_environment=None,
|
||||
google_secops_api_credentials_file=None,
|
||||
google_secops_api_customer_id=None,
|
||||
google_secops_api_region="us",
|
||||
google_secops_api_log_type="DMARC",
|
||||
google_secops_use_stdout=False,
|
||||
webhook_aggregate_url=None,
|
||||
webhook_forensic_url=None,
|
||||
webhook_smtp_tls_url=None,
|
||||
@@ -1058,10 +1087,10 @@ def _main():
|
||||
opts.elasticsearch_password = elasticsearch_config["password"]
|
||||
# Until 8.20
|
||||
if "apiKey" in elasticsearch_config:
|
||||
opts.elasticsearch_api_key = elasticsearch_config["apiKey"]
|
||||
opts.elasticsearch_apiKey = elasticsearch_config["apiKey"]
|
||||
# Since 8.20
|
||||
if "api_key" in elasticsearch_config:
|
||||
opts.elasticsearch_api_key = elasticsearch_config["api_key"]
|
||||
opts.elasticsearch_apiKey = elasticsearch_config["api_key"]
|
||||
|
||||
if "opensearch" in config:
|
||||
opensearch_config = config["opensearch"]
|
||||
@@ -1098,10 +1127,10 @@ def _main():
|
||||
opts.opensearch_password = opensearch_config["password"]
|
||||
# Until 8.20
|
||||
if "apiKey" in opensearch_config:
|
||||
opts.opensearch_api_key = opensearch_config["apiKey"]
|
||||
opts.opensearch_apiKey = opensearch_config["apiKey"]
|
||||
# Since 8.20
|
||||
if "api_key" in opensearch_config:
|
||||
opts.opensearch_api_key = opensearch_config["api_key"]
|
||||
opts.opensearch_apiKey = opensearch_config["api_key"]
|
||||
|
||||
if "splunk_hec" in config.sections():
|
||||
hec_config = config["splunk_hec"]
|
||||
@@ -1246,28 +1275,6 @@ def _main():
|
||||
opts.syslog_port = syslog_config["port"]
|
||||
else:
|
||||
opts.syslog_port = 514
|
||||
if "protocol" in syslog_config:
|
||||
opts.syslog_protocol = syslog_config["protocol"]
|
||||
else:
|
||||
opts.syslog_protocol = "udp"
|
||||
if "cafile_path" in syslog_config:
|
||||
opts.syslog_cafile_path = syslog_config["cafile_path"]
|
||||
if "certfile_path" in syslog_config:
|
||||
opts.syslog_certfile_path = syslog_config["certfile_path"]
|
||||
if "keyfile_path" in syslog_config:
|
||||
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
|
||||
if "timeout" in syslog_config:
|
||||
opts.syslog_timeout = float(syslog_config["timeout"])
|
||||
else:
|
||||
opts.syslog_timeout = 5.0
|
||||
if "retry_attempts" in syslog_config:
|
||||
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
|
||||
else:
|
||||
opts.syslog_retry_attempts = 3
|
||||
if "retry_delay" in syslog_config:
|
||||
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
|
||||
else:
|
||||
opts.syslog_retry_delay = 5
|
||||
|
||||
if "gmail_api" in config.sections():
|
||||
gmail_api_config = config["gmail_api"]
|
||||
@@ -1330,6 +1337,50 @@ def _main():
|
||||
logger.critical("mode setting missing from the gelf config section")
|
||||
exit(-1)
|
||||
|
||||
if "google_secops" in config.sections():
|
||||
google_secops_config = config["google_secops"]
|
||||
opts.google_secops = True
|
||||
if "include_ruf_payload" in google_secops_config:
|
||||
opts.google_secops_include_ruf_payload = bool(
|
||||
google_secops_config.getboolean("include_ruf_payload")
|
||||
)
|
||||
if "ruf_payload_max_bytes" in google_secops_config:
|
||||
opts.google_secops_ruf_payload_max_bytes = google_secops_config.getint(
|
||||
"ruf_payload_max_bytes"
|
||||
)
|
||||
if "static_observer_name" in google_secops_config:
|
||||
opts.google_secops_static_observer_name = google_secops_config[
|
||||
"static_observer_name"
|
||||
]
|
||||
if "static_observer_vendor" in google_secops_config:
|
||||
opts.google_secops_static_observer_vendor = google_secops_config[
|
||||
"static_observer_vendor"
|
||||
]
|
||||
if "static_environment" in google_secops_config:
|
||||
opts.google_secops_static_environment = google_secops_config[
|
||||
"static_environment"
|
||||
]
|
||||
if "api_credentials_file" in google_secops_config:
|
||||
opts.google_secops_api_credentials_file = google_secops_config[
|
||||
"api_credentials_file"
|
||||
]
|
||||
if "api_customer_id" in google_secops_config:
|
||||
opts.google_secops_api_customer_id = google_secops_config[
|
||||
"api_customer_id"
|
||||
]
|
||||
if "api_region" in google_secops_config:
|
||||
opts.google_secops_api_region = google_secops_config[
|
||||
"api_region"
|
||||
]
|
||||
if "api_log_type" in google_secops_config:
|
||||
opts.google_secops_api_log_type = google_secops_config[
|
||||
"api_log_type"
|
||||
]
|
||||
if "use_stdout" in google_secops_config:
|
||||
opts.google_secops_use_stdout = google_secops_config.getboolean(
|
||||
"use_stdout"
|
||||
)
|
||||
|
||||
if "webhook" in config.sections():
|
||||
webhook_config = config["webhook"]
|
||||
if "aggregate_url" in webhook_config:
|
||||
@@ -1465,17 +1516,6 @@ def _main():
|
||||
syslog_client = syslog.SyslogClient(
|
||||
server_name=opts.syslog_server,
|
||||
server_port=int(opts.syslog_port),
|
||||
protocol=opts.syslog_protocol or "udp",
|
||||
cafile_path=opts.syslog_cafile_path,
|
||||
certfile_path=opts.syslog_certfile_path,
|
||||
keyfile_path=opts.syslog_keyfile_path,
|
||||
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
|
||||
retry_attempts=opts.syslog_retry_attempts
|
||||
if opts.syslog_retry_attempts is not None
|
||||
else 3,
|
||||
retry_delay=opts.syslog_retry_delay
|
||||
if opts.syslog_retry_delay is not None
|
||||
else 5,
|
||||
)
|
||||
except Exception as error_:
|
||||
logger.error("Syslog Error: {0}".format(error_.__str__()))
|
||||
@@ -1519,6 +1559,23 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("GELF Error: {0}".format(error_.__str__()))
|
||||
|
||||
if opts.google_secops:
|
||||
try:
|
||||
google_secops_client = google_secops.GoogleSecOpsClient(
|
||||
include_ruf_payload=opts.google_secops_include_ruf_payload,
|
||||
ruf_payload_max_bytes=opts.google_secops_ruf_payload_max_bytes,
|
||||
static_observer_name=opts.google_secops_static_observer_name,
|
||||
static_observer_vendor=opts.google_secops_static_observer_vendor,
|
||||
static_environment=opts.google_secops_static_environment,
|
||||
api_credentials_file=opts.google_secops_api_credentials_file,
|
||||
api_customer_id=opts.google_secops_api_customer_id,
|
||||
api_region=opts.google_secops_api_region,
|
||||
api_log_type=opts.google_secops_api_log_type,
|
||||
use_stdout=opts.google_secops_use_stdout,
|
||||
)
|
||||
except Exception as error_:
|
||||
logger.error("Google SecOps Error: {0}".format(error_.__str__()))
|
||||
|
||||
if (
|
||||
opts.webhook_aggregate_url
|
||||
or opts.webhook_forensic_url
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
__version__ = "9.1.2"
|
||||
__version__ = "9.0.9"
|
||||
|
||||
USER_AGENT = f"parsedmarc/{__version__}"
|
||||
|
||||
@@ -413,8 +413,8 @@ def save_aggregate_report_to_elasticsearch(
|
||||
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # type: ignore
|
||||
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
|
||||
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) # pyright: ignore[reportArgumentType]
|
||||
begin_date_query = Q(dict(range=dict(date_begin=dict(gte=begin_date)))) # pyright: ignore[reportArgumentType]
|
||||
end_date_query = Q(dict(range=dict(date_end=dict(lte=end_date)))) # pyright: ignore[reportArgumentType]
|
||||
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType]
|
||||
end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType]
|
||||
|
||||
if index_suffix is not None:
|
||||
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
|
||||
|
||||
709
parsedmarc/google_secops.py
Normal file
709
parsedmarc/google_secops.py
Normal file
@@ -0,0 +1,709 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Google SecOps (Chronicle) output module for parsedmarc"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
import requests
|
||||
from google.auth.transport.requests import Request
|
||||
from google.oauth2 import service_account
|
||||
|
||||
from parsedmarc.constants import USER_AGENT
|
||||
from parsedmarc.log import logger
|
||||
from parsedmarc.utils import human_timestamp_to_datetime
|
||||
|
||||
|
||||
class GoogleSecOpsError(RuntimeError):
|
||||
"""Raised when a Google SecOps API error occurs"""
|
||||
|
||||
|
||||
class GoogleSecOpsClient:
|
||||
"""A client for Google SecOps (Chronicle) UDM output"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
include_ruf_payload: bool = False,
|
||||
ruf_payload_max_bytes: int = 4096,
|
||||
static_observer_name: Optional[str] = None,
|
||||
static_observer_vendor: str = "parsedmarc",
|
||||
static_environment: Optional[str] = None,
|
||||
api_credentials_file: Optional[str] = None,
|
||||
api_customer_id: Optional[str] = None,
|
||||
api_region: str = "us",
|
||||
api_log_type: str = "DMARC",
|
||||
use_stdout: bool = False,
|
||||
):
|
||||
"""
|
||||
Initializes the GoogleSecOpsClient
|
||||
|
||||
Args:
|
||||
include_ruf_payload: Include RUF message payload in output
|
||||
ruf_payload_max_bytes: Maximum bytes of RUF payload to include
|
||||
static_observer_name: Static observer name for telemetry
|
||||
static_observer_vendor: Static observer vendor (default: parsedmarc)
|
||||
static_environment: Static environment (prod/dev/custom string)
|
||||
api_credentials_file: Path to Google service account JSON credentials
|
||||
api_customer_id: Chronicle customer ID (required for API)
|
||||
api_region: Chronicle region (us, europe, asia-southeast1, etc.)
|
||||
api_log_type: Log type for Chronicle ingestion (default: DMARC)
|
||||
use_stdout: Output to stdout instead of API (default: False)
|
||||
"""
|
||||
self.include_ruf_payload = include_ruf_payload
|
||||
self.ruf_payload_max_bytes = ruf_payload_max_bytes
|
||||
self.static_observer_name = static_observer_name
|
||||
self.static_observer_vendor = static_observer_vendor
|
||||
self.static_environment = static_environment
|
||||
self.use_stdout = use_stdout
|
||||
|
||||
# API configuration
|
||||
self.api_credentials_file = api_credentials_file
|
||||
self.api_customer_id = api_customer_id
|
||||
self.api_region = api_region
|
||||
self.api_log_type = api_log_type
|
||||
self.credentials = None
|
||||
self.session = None
|
||||
|
||||
# Initialize API client if not using stdout
|
||||
if not self.use_stdout:
|
||||
if not self.api_credentials_file or not self.api_customer_id:
|
||||
raise GoogleSecOpsError(
|
||||
"api_credentials_file and api_customer_id are required when not using stdout. "
|
||||
"Set use_stdout=True to output to stdout instead."
|
||||
)
|
||||
self._initialize_api_client()
|
||||
|
||||
def _initialize_api_client(self):
|
||||
"""Initialize the Chronicle API client with authentication"""
|
||||
try:
|
||||
logger.debug("Initializing Chronicle API client")
|
||||
|
||||
# Load service account credentials
|
||||
self.credentials = service_account.Credentials.from_service_account_file(
|
||||
self.api_credentials_file,
|
||||
scopes=["https://www.googleapis.com/auth/cloud-platform"],
|
||||
)
|
||||
|
||||
# Create session with authentication
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({"User-Agent": USER_AGENT})
|
||||
|
||||
logger.info("Chronicle API client initialized successfully")
|
||||
except Exception as e:
|
||||
raise GoogleSecOpsError(f"Failed to initialize Chronicle API client: {e}")
|
||||
|
||||
def _get_api_endpoint(self) -> str:
|
||||
"""Get the Chronicle Ingestion API endpoint based on region"""
|
||||
return f"https://{self.api_region}-chronicle.googleapis.com/v1alpha/projects/{self.api_customer_id}/locations/{self.api_region}/instances/default/logTypes/{self.api_log_type}/logs:import"
|
||||
|
||||
def _send_events_to_api(self, events: list[str]) -> None:
|
||||
"""
|
||||
Send UDM events to Chronicle Ingestion API
|
||||
|
||||
Args:
|
||||
events: List of NDJSON event strings
|
||||
"""
|
||||
if not events:
|
||||
return
|
||||
|
||||
try:
|
||||
# Refresh credentials if needed
|
||||
if not self.credentials.valid:
|
||||
self.credentials.refresh(Request())
|
||||
|
||||
# Prepare request
|
||||
endpoint = self._get_api_endpoint()
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.credentials.token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
# Chronicle expects events in inline_source format
|
||||
# Each event should be a separate log entry
|
||||
payload = {
|
||||
"inline_source": {
|
||||
"logs": [json.loads(event) for event in events]
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug(f"Sending {len(events)} events to Chronicle API")
|
||||
|
||||
response = self.session.post(
|
||||
endpoint,
|
||||
headers=headers,
|
||||
json=payload,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Successfully sent {len(events)} events to Chronicle")
|
||||
else:
|
||||
error_msg = f"Chronicle API error: {response.status_code} - {response.text}"
|
||||
logger.error(error_msg)
|
||||
raise GoogleSecOpsError(error_msg)
|
||||
|
||||
except GoogleSecOpsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise GoogleSecOpsError(f"Failed to send events to Chronicle API: {e}")
|
||||
|
||||
def _output_events(self, events: list[str]) -> None:
|
||||
"""
|
||||
Output events either to stdout or Chronicle API
|
||||
|
||||
Args:
|
||||
events: List of NDJSON event strings
|
||||
"""
|
||||
if self.use_stdout:
|
||||
# Output to stdout for collection by external log shippers
|
||||
for event in events:
|
||||
print(event)
|
||||
else:
|
||||
# Send directly to Chronicle API
|
||||
self._send_events_to_api(events)
|
||||
|
||||
def _get_severity(self, disposition: str, spf_aligned: bool, dkim_aligned: bool) -> str:
|
||||
"""
|
||||
Derive severity from DMARC disposition and alignment
|
||||
|
||||
Args:
|
||||
disposition: DMARC policy disposition
|
||||
spf_aligned: Whether SPF is aligned
|
||||
dkim_aligned: Whether DKIM is aligned
|
||||
|
||||
Returns:
|
||||
Severity level: HIGH, MEDIUM, or LOW
|
||||
"""
|
||||
if disposition == "reject":
|
||||
return "HIGH"
|
||||
elif disposition == "quarantine" and not (spf_aligned or dkim_aligned):
|
||||
return "MEDIUM"
|
||||
else:
|
||||
return "LOW"
|
||||
|
||||
def _get_description(
|
||||
self,
|
||||
dmarc_pass: bool,
|
||||
spf_result: Optional[str],
|
||||
dkim_result: Optional[str],
|
||||
spf_aligned: bool,
|
||||
dkim_aligned: bool,
|
||||
disposition: str,
|
||||
) -> str:
|
||||
"""
|
||||
Generate description for the event
|
||||
|
||||
Args:
|
||||
dmarc_pass: Whether DMARC passed
|
||||
spf_result: SPF result
|
||||
dkim_result: DKIM result
|
||||
spf_aligned: Whether SPF is aligned
|
||||
dkim_aligned: Whether DKIM is aligned
|
||||
disposition: DMARC disposition
|
||||
|
||||
Returns:
|
||||
Human-readable description
|
||||
"""
|
||||
parts = []
|
||||
|
||||
if dmarc_pass:
|
||||
parts.append("DMARC pass")
|
||||
else:
|
||||
parts.append("DMARC fail")
|
||||
|
||||
if spf_result:
|
||||
parts.append(f"SPF={spf_result}")
|
||||
if dkim_result:
|
||||
parts.append(f"DKIM={dkim_result}")
|
||||
|
||||
if spf_aligned:
|
||||
parts.append("SPF aligned")
|
||||
elif spf_result:
|
||||
parts.append("SPF not aligned")
|
||||
|
||||
if dkim_aligned:
|
||||
parts.append("DKIM aligned")
|
||||
elif dkim_result:
|
||||
parts.append("DKIM not aligned")
|
||||
|
||||
parts.append(f"disposition={disposition}")
|
||||
|
||||
return "; ".join(parts)
|
||||
|
||||
def _format_timestamp(self, timestamp_str: str) -> str:
|
||||
"""
|
||||
Convert parsedmarc timestamp to RFC 3339 format
|
||||
|
||||
Args:
|
||||
timestamp_str: Timestamp string from parsedmarc
|
||||
|
||||
Returns:
|
||||
RFC 3339 formatted timestamp
|
||||
"""
|
||||
try:
|
||||
dt = human_timestamp_to_datetime(timestamp_str)
|
||||
# Ensure timezone-aware datetime
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt.isoformat()
|
||||
except Exception:
|
||||
# Fallback to current time if parsing fails
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
def save_aggregate_report_to_google_secops(
|
||||
self, aggregate_report: dict[str, Any]
|
||||
) -> list[str]:
|
||||
"""
|
||||
Convert aggregate DMARC report to Google SecOps UDM format and send to Chronicle
|
||||
|
||||
When use_stdout=False: Events are sent to Chronicle API, returns empty list
|
||||
When use_stdout=True: Returns list of NDJSON event strings for stdout
|
||||
|
||||
Args:
|
||||
aggregate_report: Aggregate report dictionary from parsedmarc
|
||||
|
||||
Returns:
|
||||
List of NDJSON event strings (empty if sent to API)
|
||||
"""
|
||||
logger.debug("Converting aggregate report to Google SecOps UDM format")
|
||||
events = []
|
||||
|
||||
try:
|
||||
report_metadata = aggregate_report["report_metadata"]
|
||||
policy_published = aggregate_report["policy_published"]
|
||||
|
||||
for record in aggregate_report["records"]:
|
||||
# Extract values
|
||||
source_ip = record["source"]["ip_address"]
|
||||
source_country = record["source"].get("country")
|
||||
source_reverse_dns = record["source"].get("reverse_dns")
|
||||
source_base_domain = record["source"].get("base_domain")
|
||||
source_name = record["source"].get("name")
|
||||
source_type = record["source"].get("type")
|
||||
|
||||
header_from = record["identifiers"]["header_from"]
|
||||
envelope_from = record["identifiers"]["envelope_from"]
|
||||
|
||||
disposition = record["policy_evaluated"]["disposition"]
|
||||
spf_aligned = record["alignment"]["spf"]
|
||||
dkim_aligned = record["alignment"]["dkim"]
|
||||
dmarc_pass = record["alignment"]["dmarc"]
|
||||
|
||||
count = record["count"]
|
||||
interval_begin = record["interval_begin"]
|
||||
|
||||
# Get auth results
|
||||
spf_results = record["auth_results"].get("spf", [])
|
||||
dkim_results = record["auth_results"].get("dkim", [])
|
||||
|
||||
spf_result = spf_results[0]["result"] if spf_results else None
|
||||
dkim_result = dkim_results[0]["result"] if dkim_results else None
|
||||
|
||||
# Build UDM event
|
||||
event: dict[str, Any] = {
|
||||
"event_type": "DMARC_AGGREGATE",
|
||||
"metadata": {
|
||||
"event_timestamp": self._format_timestamp(interval_begin),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"principal": {
|
||||
"ip": [source_ip],
|
||||
},
|
||||
"target": {
|
||||
"domain": {
|
||||
"name": header_from,
|
||||
}
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": self._get_severity(
|
||||
disposition, spf_aligned, dkim_aligned
|
||||
),
|
||||
"description": self._get_description(
|
||||
dmarc_pass,
|
||||
spf_result,
|
||||
dkim_result,
|
||||
spf_aligned,
|
||||
dkim_aligned,
|
||||
disposition,
|
||||
),
|
||||
"detection_fields": [
|
||||
{"key": "dmarc.disposition", "value": disposition},
|
||||
{"key": "dmarc.policy", "value": policy_published["p"]},
|
||||
{"key": "dmarc.pass", "value": dmarc_pass},
|
||||
{"key": "dmarc.spf_aligned", "value": spf_aligned},
|
||||
{"key": "dmarc.dkim_aligned", "value": dkim_aligned},
|
||||
{"key": "dmarc.header_from", "value": header_from},
|
||||
{"key": "dmarc.envelope_from", "value": envelope_from},
|
||||
{"key": "dmarc.report_org", "value": report_metadata["org_name"]},
|
||||
{"key": "dmarc.report_id", "value": report_metadata["report_id"]},
|
||||
{"key": "dmarc.report_begin", "value": report_metadata["begin_date"]},
|
||||
{"key": "dmarc.report_end", "value": report_metadata["end_date"]},
|
||||
{"key": "dmarc.row_count", "value": count},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Add optional fields to detection_fields
|
||||
if spf_result:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.spf_result", "value": spf_result}
|
||||
)
|
||||
if dkim_result:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.dkim_result", "value": dkim_result}
|
||||
)
|
||||
if source_name:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.source_service_name", "value": source_name}
|
||||
)
|
||||
if source_type:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.source_service_type", "value": source_type}
|
||||
)
|
||||
|
||||
# Add optional context fields (low-value, not for dashboarding)
|
||||
additional_context = []
|
||||
|
||||
if source_base_domain:
|
||||
additional_context.append(
|
||||
{"key": "source_base_domain", "value": source_base_domain}
|
||||
)
|
||||
|
||||
if self.static_environment:
|
||||
additional_context.append(
|
||||
{"key": "environment", "value": self.static_environment}
|
||||
)
|
||||
|
||||
# Add SPF auth results to context
|
||||
if spf_results:
|
||||
for idx, spf in enumerate(spf_results):
|
||||
additional_context.append(
|
||||
{"key": f"spf_{idx}_domain", "value": spf.get("domain", "")}
|
||||
)
|
||||
additional_context.append(
|
||||
{"key": f"spf_{idx}_result", "value": spf.get("result", "")}
|
||||
)
|
||||
|
||||
# Add DKIM auth results to context
|
||||
if dkim_results:
|
||||
for idx, dkim in enumerate(dkim_results):
|
||||
additional_context.append(
|
||||
{"key": f"dkim_{idx}_domain", "value": dkim.get("domain", "")}
|
||||
)
|
||||
additional_context.append(
|
||||
{"key": f"dkim_{idx}_result", "value": dkim.get("result", "")}
|
||||
)
|
||||
|
||||
# Only add additional section if there's context to include
|
||||
if additional_context:
|
||||
event["additional"] = {"fields": additional_context}
|
||||
|
||||
# Add optional UDM fields
|
||||
if source_country:
|
||||
event["principal"]["location"] = {"country_or_region": source_country}
|
||||
|
||||
if source_reverse_dns:
|
||||
event["principal"]["hostname"] = source_reverse_dns
|
||||
|
||||
if self.static_observer_name:
|
||||
event["metadata"]["product_deployment_id"] = self.static_observer_name
|
||||
|
||||
events.append(json.dumps(event, ensure_ascii=False))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting aggregate report to Google SecOps format: {e}")
|
||||
# Generate error event
|
||||
error_event: dict[str, Any] = {
|
||||
"event_type": "DMARC_PARSE_ERROR",
|
||||
"metadata": {
|
||||
"event_timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": "ERROR",
|
||||
"description": f"Failed to parse DMARC aggregate report: {str(e)}",
|
||||
}
|
||||
],
|
||||
}
|
||||
events.append(json.dumps(error_event, ensure_ascii=False))
|
||||
|
||||
# Output events (to stdout or API)
|
||||
self._output_events(events)
|
||||
|
||||
# Return events only if using stdout (for CLI to print)
|
||||
return events if self.use_stdout else []
|
||||
|
||||
def save_forensic_report_to_google_secops(
|
||||
self, forensic_report: dict[str, Any]
|
||||
) -> list[str]:
|
||||
"""
|
||||
Convert forensic DMARC report to Google SecOps UDM format and send to Chronicle
|
||||
|
||||
When use_stdout=False: Events are sent to Chronicle API, returns empty list
|
||||
When use_stdout=True: Returns list of NDJSON event strings for stdout
|
||||
|
||||
Args:
|
||||
forensic_report: Forensic report dictionary from parsedmarc
|
||||
|
||||
Returns:
|
||||
List of NDJSON event strings (empty if sent to API)
|
||||
"""
|
||||
logger.debug("Converting forensic report to Google SecOps UDM format")
|
||||
events = []
|
||||
|
||||
try:
|
||||
source_ip = forensic_report["source"]["ip_address"]
|
||||
source_country = forensic_report["source"].get("country")
|
||||
source_reverse_dns = forensic_report["source"].get("reverse_dns")
|
||||
source_base_domain = forensic_report["source"].get("base_domain")
|
||||
source_name = forensic_report["source"].get("name")
|
||||
source_type = forensic_report["source"].get("type")
|
||||
|
||||
reported_domain = forensic_report["reported_domain"]
|
||||
arrival_date = forensic_report["arrival_date_utc"]
|
||||
auth_failure = forensic_report.get("auth_failure", [])
|
||||
|
||||
# Determine severity - forensic reports indicate failures
|
||||
# Default to MEDIUM for authentication failures
|
||||
severity = "MEDIUM"
|
||||
|
||||
# Build description
|
||||
auth_failure_str = ", ".join(auth_failure) if auth_failure else "unknown"
|
||||
description = f"DMARC forensic report: authentication failure ({auth_failure_str})"
|
||||
|
||||
# Build UDM event
|
||||
event: dict[str, Any] = {
|
||||
"event_type": "DMARC_FORENSIC",
|
||||
"metadata": {
|
||||
"event_timestamp": self._format_timestamp(arrival_date),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"principal": {
|
||||
"ip": [source_ip],
|
||||
},
|
||||
"target": {
|
||||
"domain": {
|
||||
"name": reported_domain,
|
||||
}
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": severity,
|
||||
"description": description,
|
||||
"detection_fields": [
|
||||
{"key": "dmarc.auth_failure", "value": auth_failure_str},
|
||||
{"key": "dmarc.reported_domain", "value": reported_domain},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Add optional fields to detection_fields (matching aggregate report field names)
|
||||
if source_name:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.source_service_name", "value": source_name}
|
||||
)
|
||||
if source_type:
|
||||
event["security_result"][0]["detection_fields"].append(
|
||||
{"key": "dmarc.source_service_type", "value": source_type}
|
||||
)
|
||||
|
||||
# Add optional context fields (low-value, not for dashboarding)
|
||||
additional_context = []
|
||||
|
||||
if source_base_domain:
|
||||
additional_context.append(
|
||||
{"key": "source_base_domain", "value": source_base_domain}
|
||||
)
|
||||
|
||||
if forensic_report.get("feedback_type"):
|
||||
additional_context.append(
|
||||
{"key": "feedback_type", "value": forensic_report["feedback_type"]}
|
||||
)
|
||||
|
||||
if forensic_report.get("message_id"):
|
||||
additional_context.append(
|
||||
{"key": "message_id", "value": forensic_report["message_id"]}
|
||||
)
|
||||
|
||||
if forensic_report.get("authentication_results"):
|
||||
additional_context.append(
|
||||
{"key": "authentication_results", "value": forensic_report["authentication_results"]}
|
||||
)
|
||||
|
||||
if forensic_report.get("delivery_result"):
|
||||
additional_context.append(
|
||||
{"key": "delivery_result", "value": forensic_report["delivery_result"]}
|
||||
)
|
||||
|
||||
if self.static_environment:
|
||||
additional_context.append(
|
||||
{"key": "environment", "value": self.static_environment}
|
||||
)
|
||||
|
||||
# Add payload excerpt if enabled
|
||||
if self.include_ruf_payload and forensic_report.get("sample"):
|
||||
sample = forensic_report["sample"]
|
||||
if len(sample) > self.ruf_payload_max_bytes:
|
||||
sample = sample[:self.ruf_payload_max_bytes] + "... [truncated]"
|
||||
additional_context.append(
|
||||
{"key": "message_sample", "value": sample}
|
||||
)
|
||||
|
||||
# Only add additional section if there's context to include
|
||||
if additional_context:
|
||||
event["additional"] = {"fields": additional_context}
|
||||
|
||||
# Add optional UDM fields
|
||||
if source_country:
|
||||
event["principal"]["location"] = {"country_or_region": source_country}
|
||||
|
||||
if source_reverse_dns:
|
||||
event["principal"]["hostname"] = source_reverse_dns
|
||||
|
||||
if self.static_observer_name:
|
||||
event["metadata"]["product_deployment_id"] = self.static_observer_name
|
||||
|
||||
events.append(json.dumps(event, ensure_ascii=False))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting forensic report to Google SecOps format: {e}")
|
||||
# Generate error event
|
||||
error_event: dict[str, Any] = {
|
||||
"event_type": "DMARC_PARSE_ERROR",
|
||||
"metadata": {
|
||||
"event_timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": "ERROR",
|
||||
"description": f"Failed to parse DMARC forensic report: {str(e)}",
|
||||
}
|
||||
],
|
||||
}
|
||||
events.append(json.dumps(error_event, ensure_ascii=False))
|
||||
|
||||
# Output events (to stdout or API)
|
||||
self._output_events(events)
|
||||
|
||||
# Return events only if using stdout (for CLI to print)
|
||||
return events if self.use_stdout else []
|
||||
|
||||
def save_smtp_tls_report_to_google_secops(
|
||||
self, smtp_tls_report: dict[str, Any]
|
||||
) -> list[str]:
|
||||
"""
|
||||
Convert SMTP TLS report to Google SecOps UDM format and send to Chronicle
|
||||
|
||||
When use_stdout=False: Events are sent to Chronicle API, returns empty list
|
||||
When use_stdout=True: Returns list of NDJSON event strings for stdout
|
||||
|
||||
Args:
|
||||
smtp_tls_report: SMTP TLS report dictionary from parsedmarc
|
||||
|
||||
Returns:
|
||||
List of NDJSON event strings (empty if sent to API)
|
||||
"""
|
||||
logger.debug("Converting SMTP TLS report to Google SecOps UDM format")
|
||||
events = []
|
||||
|
||||
try:
|
||||
organization_name = smtp_tls_report.get("organization_name", "")
|
||||
begin_date = smtp_tls_report["begin_date"]
|
||||
end_date = smtp_tls_report["end_date"]
|
||||
|
||||
for policy in smtp_tls_report.get("policies", []):
|
||||
policy_domain = policy["policy_domain"]
|
||||
|
||||
for failure in policy.get("failure_details", []):
|
||||
# Build UDM event for each failure
|
||||
event: dict[str, Any] = {
|
||||
"event_type": "SMTP_TLS_REPORT",
|
||||
"metadata": {
|
||||
"event_timestamp": self._format_timestamp(begin_date),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"target": {
|
||||
"domain": {
|
||||
"name": policy_domain,
|
||||
}
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": "LOW",
|
||||
"description": f"SMTP TLS failure: {failure.get('result_type', 'unknown')}",
|
||||
"detection_fields": [
|
||||
{"key": "smtp_tls.policy_domain", "value": policy_domain},
|
||||
{"key": "smtp_tls.result_type", "value": failure.get("result_type", "")},
|
||||
{"key": "smtp_tls.failed_session_count", "value": failure.get("failed_session_count", 0)},
|
||||
{"key": "smtp_tls.report_org", "value": organization_name},
|
||||
{"key": "smtp_tls.report_begin", "value": begin_date},
|
||||
{"key": "smtp_tls.report_end", "value": end_date},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Add optional context fields (low-value, not for dashboarding)
|
||||
additional_context = []
|
||||
|
||||
if self.static_environment:
|
||||
additional_context.append(
|
||||
{"key": "environment", "value": self.static_environment}
|
||||
)
|
||||
|
||||
# Only add additional section if there's context to include
|
||||
if additional_context:
|
||||
event["additional"] = {"fields": additional_context}
|
||||
|
||||
# Add optional UDM fields
|
||||
if failure.get("sending_mta_ip"):
|
||||
event["principal"] = {"ip": [failure["sending_mta_ip"]]}
|
||||
|
||||
if self.static_observer_name:
|
||||
event["metadata"]["product_deployment_id"] = self.static_observer_name
|
||||
|
||||
events.append(json.dumps(event, ensure_ascii=False))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting SMTP TLS report to Google SecOps format: {e}")
|
||||
# Generate error event
|
||||
error_event: dict[str, Any] = {
|
||||
"event_type": "DMARC_PARSE_ERROR",
|
||||
"metadata": {
|
||||
"event_timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"event_type": "GENERIC_EVENT",
|
||||
"product_name": "parsedmarc",
|
||||
"vendor_name": self.static_observer_vendor,
|
||||
},
|
||||
"security_result": [
|
||||
{
|
||||
"severity": "ERROR",
|
||||
"description": f"Failed to parse SMTP TLS report: {str(e)}",
|
||||
}
|
||||
],
|
||||
}
|
||||
events.append(json.dumps(error_event, ensure_ascii=False))
|
||||
|
||||
# Output events (to stdout or API)
|
||||
self._output_events(events)
|
||||
|
||||
# Return events only if using stdout (for CLI to print)
|
||||
return events if self.use_stdout else []
|
||||
@@ -413,8 +413,8 @@ def save_aggregate_report_to_opensearch(
|
||||
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
|
||||
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
|
||||
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
|
||||
begin_date_query = Q(dict(range=dict(date_begin=dict(gte=begin_date))))
|
||||
end_date_query = Q(dict(range=dict(date_end=dict(lte=end_date))))
|
||||
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
|
||||
end_date_query = Q(dict(match=dict(date_end=end_date)))
|
||||
|
||||
if index_suffix is not None:
|
||||
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
|
||||
|
||||
@@ -6,10 +6,7 @@ from __future__ import annotations
|
||||
import json
|
||||
import logging
|
||||
import logging.handlers
|
||||
import socket
|
||||
import ssl
|
||||
import time
|
||||
from typing import Any, Optional
|
||||
from typing import Any
|
||||
|
||||
from parsedmarc import (
|
||||
parsed_aggregate_reports_to_csv_rows,
|
||||
@@ -21,150 +18,20 @@ from parsedmarc import (
|
||||
class SyslogClient(object):
|
||||
"""A client for Syslog"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_name: str,
|
||||
server_port: int,
|
||||
protocol: str = "udp",
|
||||
cafile_path: Optional[str] = None,
|
||||
certfile_path: Optional[str] = None,
|
||||
keyfile_path: Optional[str] = None,
|
||||
timeout: float = 5.0,
|
||||
retry_attempts: int = 3,
|
||||
retry_delay: int = 5,
|
||||
):
|
||||
def __init__(self, server_name: str, server_port: int):
|
||||
"""
|
||||
Initializes the SyslogClient
|
||||
Args:
|
||||
server_name (str): The Syslog server
|
||||
server_port (int): The Syslog port
|
||||
protocol (str): The protocol to use: "udp", "tcp", or "tls" (Default: "udp")
|
||||
cafile_path (str): Path to CA certificate file for TLS server verification (Optional)
|
||||
certfile_path (str): Path to client certificate file for TLS authentication (Optional)
|
||||
keyfile_path (str): Path to client private key file for TLS authentication (Optional)
|
||||
timeout (float): Connection timeout in seconds for TCP/TLS (Default: 5.0)
|
||||
retry_attempts (int): Number of retry attempts for failed connections (Default: 3)
|
||||
retry_delay (int): Delay in seconds between retry attempts (Default: 5)
|
||||
server_port (int): The Syslog UDP port
|
||||
"""
|
||||
self.server_name = server_name
|
||||
self.server_port = server_port
|
||||
self.protocol = protocol.lower()
|
||||
self.timeout = timeout
|
||||
self.retry_attempts = retry_attempts
|
||||
self.retry_delay = retry_delay
|
||||
|
||||
self.logger = logging.getLogger("parsedmarc_syslog")
|
||||
self.logger.setLevel(logging.INFO)
|
||||
|
||||
# Create the appropriate syslog handler based on protocol
|
||||
log_handler = self._create_syslog_handler(
|
||||
server_name,
|
||||
server_port,
|
||||
self.protocol,
|
||||
cafile_path,
|
||||
certfile_path,
|
||||
keyfile_path,
|
||||
timeout,
|
||||
retry_attempts,
|
||||
retry_delay,
|
||||
)
|
||||
|
||||
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
|
||||
self.logger.addHandler(log_handler)
|
||||
|
||||
def _create_syslog_handler(
|
||||
self,
|
||||
server_name: str,
|
||||
server_port: int,
|
||||
protocol: str,
|
||||
cafile_path: Optional[str],
|
||||
certfile_path: Optional[str],
|
||||
keyfile_path: Optional[str],
|
||||
timeout: float,
|
||||
retry_attempts: int,
|
||||
retry_delay: int,
|
||||
) -> logging.handlers.SysLogHandler:
|
||||
"""
|
||||
Creates a SysLogHandler with the specified protocol and TLS settings
|
||||
"""
|
||||
if protocol == "udp":
|
||||
# UDP protocol (default, backward compatible)
|
||||
return logging.handlers.SysLogHandler(
|
||||
address=(server_name, server_port),
|
||||
socktype=socket.SOCK_DGRAM,
|
||||
)
|
||||
elif protocol in ["tcp", "tls"]:
|
||||
# TCP or TLS protocol with retry logic
|
||||
for attempt in range(1, retry_attempts + 1):
|
||||
try:
|
||||
if protocol == "tcp":
|
||||
# TCP without TLS
|
||||
handler = logging.handlers.SysLogHandler(
|
||||
address=(server_name, server_port),
|
||||
socktype=socket.SOCK_STREAM,
|
||||
)
|
||||
# Set timeout on the socket
|
||||
if hasattr(handler, "socket") and handler.socket:
|
||||
handler.socket.settimeout(timeout)
|
||||
return handler
|
||||
else:
|
||||
# TLS protocol
|
||||
# Create SSL context with secure defaults
|
||||
ssl_context = ssl.create_default_context()
|
||||
|
||||
# Explicitly set minimum TLS version to 1.2 for security
|
||||
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
|
||||
|
||||
# Configure server certificate verification
|
||||
if cafile_path:
|
||||
ssl_context.load_verify_locations(cafile=cafile_path)
|
||||
|
||||
# Configure client certificate authentication
|
||||
if certfile_path and keyfile_path:
|
||||
ssl_context.load_cert_chain(
|
||||
certfile=certfile_path,
|
||||
keyfile=keyfile_path,
|
||||
)
|
||||
elif certfile_path or keyfile_path:
|
||||
# Warn if only one of the two required parameters is provided
|
||||
self.logger.warning(
|
||||
"Both certfile_path and keyfile_path are required for "
|
||||
"client certificate authentication. Client authentication "
|
||||
"will not be used."
|
||||
)
|
||||
|
||||
# Create TCP handler first
|
||||
handler = logging.handlers.SysLogHandler(
|
||||
address=(server_name, server_port),
|
||||
socktype=socket.SOCK_STREAM,
|
||||
)
|
||||
|
||||
# Wrap socket with TLS
|
||||
if hasattr(handler, "socket") and handler.socket:
|
||||
handler.socket = ssl_context.wrap_socket(
|
||||
handler.socket,
|
||||
server_hostname=server_name,
|
||||
)
|
||||
handler.socket.settimeout(timeout)
|
||||
|
||||
return handler
|
||||
|
||||
except Exception as e:
|
||||
if attempt < retry_attempts:
|
||||
self.logger.warning(
|
||||
f"Syslog connection attempt {attempt}/{retry_attempts} failed: {e}. "
|
||||
f"Retrying in {retry_delay} seconds..."
|
||||
)
|
||||
time.sleep(retry_delay)
|
||||
else:
|
||||
self.logger.error(
|
||||
f"Syslog connection failed after {retry_attempts} attempts: {e}"
|
||||
)
|
||||
raise
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid protocol '{protocol}'. Must be 'udp', 'tcp', or 'tls'."
|
||||
)
|
||||
|
||||
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
|
||||
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
|
||||
for row in rows:
|
||||
|
||||
@@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
|
||||
|
||||
# NOTE: This module is intentionally Python 3.10 compatible.
|
||||
# NOTE: This module is intentionally Python 3.9 compatible.
|
||||
# - No PEP 604 unions (A | B)
|
||||
# - No typing.NotRequired / Required (3.11+) to avoid an extra dependency.
|
||||
# For optional keys, use total=False TypedDicts.
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
requires = [
|
||||
"hatchling>=1.27.0",
|
||||
]
|
||||
requires_python = ">=3.10,<3.15"
|
||||
requires_python = ">=3.9,<3.14"
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
@@ -29,7 +29,7 @@ classifiers = [
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python :: 3"
|
||||
]
|
||||
requires-python = ">=3.10"
|
||||
requires-python = ">=3.9, <3.14"
|
||||
dependencies = [
|
||||
"azure-identity>=1.8.0",
|
||||
"azure-monitor-ingestion>=1.0.0",
|
||||
@@ -45,10 +45,10 @@ dependencies = [
|
||||
"google-auth-httplib2>=0.1.0",
|
||||
"google-auth-oauthlib>=0.4.6",
|
||||
"google-auth>=2.3.3",
|
||||
"imapclient>=3.1.0",
|
||||
"imapclient>=2.1.0",
|
||||
"kafka-python-ng>=2.2.2",
|
||||
"lxml>=4.4.0",
|
||||
"mailsuite>=1.11.2",
|
||||
"mailsuite>=1.11.1",
|
||||
"msgraph-core==0.2.2",
|
||||
"opensearch-py>=2.4.2,<=3.0.0",
|
||||
"publicsuffixlist>=0.10.0",
|
||||
|
||||
183
tests.py
183
tests.py
@@ -3,6 +3,7 @@
|
||||
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
|
||||
import json
|
||||
import os
|
||||
import unittest
|
||||
from glob import glob
|
||||
@@ -12,9 +13,6 @@ from lxml import etree
|
||||
import parsedmarc
|
||||
import parsedmarc.utils
|
||||
|
||||
# Detect if running in GitHub Actions to skip DNS lookups
|
||||
OFFLINE_MODE = os.environ.get("GITHUB_ACTIONS", "false").lower() == "true"
|
||||
|
||||
|
||||
def minify_xml(xml_string):
|
||||
parser = etree.XMLParser(remove_blank_text=True)
|
||||
@@ -124,7 +122,7 @@ class Test(unittest.TestCase):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
|
||||
sample_path, always_use_local_files=True
|
||||
)["report"]
|
||||
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
|
||||
print("Passed!")
|
||||
@@ -132,7 +130,7 @@ class Test(unittest.TestCase):
|
||||
def testEmptySample(self):
|
||||
"""Test empty/unparasable report"""
|
||||
with self.assertRaises(parsedmarc.ParserError):
|
||||
parsedmarc.parse_report_file("samples/empty.xml", offline=OFFLINE_MODE)
|
||||
parsedmarc.parse_report_file("samples/empty.xml")
|
||||
|
||||
def testForensicSamples(self):
|
||||
"""Test sample forensic/ruf/failure DMARC reports"""
|
||||
@@ -142,12 +140,8 @@ class Test(unittest.TestCase):
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
with open(sample_path) as sample_file:
|
||||
sample_content = sample_file.read()
|
||||
parsed_report = parsedmarc.parse_report_email(
|
||||
sample_content, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsed_report = parsedmarc.parse_report_email(sample_content)["report"]
|
||||
parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
|
||||
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
|
||||
print("Passed!")
|
||||
|
||||
@@ -159,12 +153,173 @@ class Test(unittest.TestCase):
|
||||
if os.path.isdir(sample_path):
|
||||
continue
|
||||
print("Testing {0}: ".format(sample_path), end="")
|
||||
parsed_report = parsedmarc.parse_report_file(
|
||||
sample_path, offline=OFFLINE_MODE
|
||||
)["report"]
|
||||
parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
|
||||
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
|
||||
print("Passed!")
|
||||
|
||||
def testGoogleSecOpsAggregateReport(self):
|
||||
"""Test Google SecOps aggregate report conversion"""
|
||||
print()
|
||||
from parsedmarc.google_secops import GoogleSecOpsClient
|
||||
|
||||
client = GoogleSecOpsClient(use_stdout=True)
|
||||
sample_path = "samples/aggregate/example.net!example.com!1529366400!1529452799.xml"
|
||||
print("Testing Google SecOps aggregate conversion for {0}: ".format(sample_path), end="")
|
||||
|
||||
parsed_file = parsedmarc.parse_report_file(sample_path, always_use_local_files=True)
|
||||
parsed_report = parsed_file["report"]
|
||||
|
||||
events = client.save_aggregate_report_to_google_secops(parsed_report)
|
||||
|
||||
# Verify we got events
|
||||
assert len(events) > 0, "Expected at least one event"
|
||||
|
||||
# Verify each event is valid JSON
|
||||
for event in events:
|
||||
event_dict = json.loads(event)
|
||||
assert "event_type" in event_dict
|
||||
assert event_dict["event_type"] == "DMARC_AGGREGATE"
|
||||
assert "metadata" in event_dict
|
||||
assert "principal" in event_dict
|
||||
assert "target" in event_dict
|
||||
assert "security_result" in event_dict
|
||||
|
||||
print("Passed!")
|
||||
|
||||
def testGoogleSecOpsForensicReport(self):
|
||||
"""Test Google SecOps forensic report conversion"""
|
||||
print()
|
||||
from parsedmarc.google_secops import GoogleSecOpsClient
|
||||
|
||||
# Test without payload
|
||||
client = GoogleSecOpsClient(include_ruf_payload=False, use_stdout=True)
|
||||
sample_path = "samples/forensic/dmarc_ruf_report_linkedin.eml"
|
||||
print("Testing Google SecOps forensic conversion (no payload) for {0}: ".format(sample_path), end="")
|
||||
|
||||
parsed_file = parsedmarc.parse_report_file(sample_path)
|
||||
parsed_report = parsed_file["report"]
|
||||
|
||||
events = client.save_forensic_report_to_google_secops(parsed_report)
|
||||
|
||||
# Verify we got events
|
||||
assert len(events) > 0, "Expected at least one event"
|
||||
|
||||
# Verify each event is valid JSON
|
||||
for event in events:
|
||||
event_dict = json.loads(event)
|
||||
assert "event_type" in event_dict
|
||||
assert event_dict["event_type"] == "DMARC_FORENSIC"
|
||||
|
||||
# Verify no payload in additional fields
|
||||
if "additional" in event_dict and "fields" in event_dict["additional"]:
|
||||
for field in event_dict["additional"]["fields"]:
|
||||
assert field["key"] != "message_sample", "Payload should not be included when disabled"
|
||||
|
||||
print("Passed!")
|
||||
|
||||
# Test with payload
|
||||
client_with_payload = GoogleSecOpsClient(
|
||||
include_ruf_payload=True,
|
||||
ruf_payload_max_bytes=100,
|
||||
use_stdout=True
|
||||
)
|
||||
print("Testing Google SecOps forensic conversion (with payload) for {0}: ".format(sample_path), end="")
|
||||
|
||||
events_with_payload = client_with_payload.save_forensic_report_to_google_secops(parsed_report)
|
||||
|
||||
# Verify we got events
|
||||
assert len(events_with_payload) > 0, "Expected at least one event"
|
||||
|
||||
# Verify payload is included
|
||||
for event in events_with_payload:
|
||||
event_dict = json.loads(event)
|
||||
|
||||
# Check if message_sample is in additional fields
|
||||
has_sample = False
|
||||
if "additional" in event_dict and "fields" in event_dict["additional"]:
|
||||
for field in event_dict["additional"]["fields"]:
|
||||
if field["key"] == "message_sample":
|
||||
has_sample = True
|
||||
# Verify truncation: max_bytes (100) + "... [truncated]" suffix (16 chars)
|
||||
# Allow some margin for the actual payload length
|
||||
max_expected_length = 100 + len("... [truncated]") + 10
|
||||
assert len(field["value"]) <= max_expected_length, f"Payload should be truncated, got {len(field['value'])} bytes"
|
||||
break
|
||||
|
||||
assert has_sample, "Payload should be included when enabled"
|
||||
|
||||
print("Passed!")
|
||||
|
||||
def testGoogleSecOpsConfiguration(self):
|
||||
"""Test Google SecOps client configuration"""
|
||||
print()
|
||||
from parsedmarc.google_secops import GoogleSecOpsClient
|
||||
|
||||
print("Testing Google SecOps client configuration: ", end="")
|
||||
|
||||
# Test stdout configuration
|
||||
client1 = GoogleSecOpsClient(use_stdout=True)
|
||||
assert client1.include_ruf_payload is False
|
||||
assert client1.ruf_payload_max_bytes == 4096
|
||||
assert client1.static_observer_vendor == "parsedmarc"
|
||||
assert client1.static_observer_name is None
|
||||
assert client1.static_environment is None
|
||||
assert client1.use_stdout is True
|
||||
|
||||
# Test custom configuration
|
||||
client2 = GoogleSecOpsClient(
|
||||
include_ruf_payload=True,
|
||||
ruf_payload_max_bytes=8192,
|
||||
static_observer_name="test-observer",
|
||||
static_observer_vendor="test-vendor",
|
||||
static_environment="prod",
|
||||
use_stdout=True
|
||||
)
|
||||
assert client2.include_ruf_payload is True
|
||||
assert client2.ruf_payload_max_bytes == 8192
|
||||
assert client2.static_observer_name == "test-observer"
|
||||
assert client2.static_observer_vendor == "test-vendor"
|
||||
assert client2.static_environment == "prod"
|
||||
|
||||
print("Passed!")
|
||||
|
||||
def testGoogleSecOpsSmtpTlsReport(self):
|
||||
"""Test Google SecOps SMTP TLS report conversion"""
|
||||
print()
|
||||
from parsedmarc.google_secops import GoogleSecOpsClient
|
||||
|
||||
client = GoogleSecOpsClient(use_stdout=True)
|
||||
sample_path = "samples/smtp_tls/rfc8460.json"
|
||||
print("Testing Google SecOps SMTP TLS conversion for {0}: ".format(sample_path), end="")
|
||||
|
||||
parsed_file = parsedmarc.parse_report_file(sample_path)
|
||||
parsed_report = parsed_file["report"]
|
||||
|
||||
events = client.save_smtp_tls_report_to_google_secops(parsed_report)
|
||||
|
||||
# Verify we got events
|
||||
assert len(events) > 0, "Expected at least one event"
|
||||
|
||||
# Verify each event is valid JSON
|
||||
for event in events:
|
||||
event_dict = json.loads(event)
|
||||
assert "event_type" in event_dict
|
||||
assert event_dict["event_type"] == "SMTP_TLS_REPORT"
|
||||
assert "metadata" in event_dict
|
||||
assert "target" in event_dict
|
||||
assert "security_result" in event_dict
|
||||
|
||||
# Verify failed_session_count is in detection_fields as an integer
|
||||
found_count = False
|
||||
for field in event_dict["security_result"][0]["detection_fields"]:
|
||||
if field["key"] == "smtp_tls.failed_session_count":
|
||||
assert isinstance(field["value"], int), "failed_session_count should be an integer"
|
||||
found_count = True
|
||||
break
|
||||
assert found_count, "failed_session_count should be in detection_fields"
|
||||
|
||||
print("Passed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
|
||||
Reference in New Issue
Block a user