Fix JSON type handling and pre-json field init in SecOps parser

Two CBN behaviors, confirmed against Google's own "How to parse JSON data"
guide (statedump shows JSON true/199 retaining boolean/integer type) and the
published Corelight production parser:

1. The json{} filter preserves the original JSON type, so parsedmarc's boolean
   *_aligned / testing / normalized_timespan and numeric count / *_session_count
   / source_asn would never match string comparisons. Add a mutate{convert} step
   turning them into strings before any == "true"/"false" test or %{...} use.

2. CBN raises _failed_parsing_ when an `if [field]` references a field absent
   from the log, and most detection/mapping fields are absent in 2 of the 3
   report shapes (or null within one). Initialize every conditionally-checked
   field to "" before the json{} filter.

Without these, DMARC-fail records would not be categorized AUTH_VIOLATION and
aggregate/TLS reports could fail parsing outright. README caveat and PR
validation steps updated accordingly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sean Whalen
2026-06-04 10:22:02 -04:00
parent 784e3050bd
commit 2d9a2a2a8f
2 changed files with 127 additions and 17 deletions
+11 -7
View File
@@ -40,13 +40,17 @@ DMARC types.
## Caveats
1. **Unvalidated** — see [Status](#status).
2. **Boolean coercion** — parsedmarc emits `dmarc_aligned`, `spf_aligned`,
`dkim_aligned`, `testing`, and `normalized_timespan` as JSON booleans. The
parser assumes the `json{}` filter exposes them as the strings `"true"` /
`"false"` (the CBN convention) and compares them as such. The security-
relevant consequence to confirm in the validation tool: a DMARC-fail record
(`dmarc_aligned=false`) must receive `security_result.category =
AUTH_VIOLATION`.
2. **JSON type handling** — parsedmarc emits `dmarc_aligned` / `spf_aligned` /
`dkim_aligned` / `testing` / `normalized_timespan` as JSON booleans and
`count` / `*_session_count` / `source_asn` as numbers. Chronicle's `json{}`
filter **preserves the original JSON type**, so the parser explicitly
converts these to strings (`mutate { convert => { … => "string" } }`) before
any comparison — otherwise `[dmarc_aligned] == "false"` would never match.
Relatedly, every field tested in an `if` is initialized to `""` *before* the
`json` filter, because CBN raises `_failed_parsing_` on a conditional that
references a field absent from the log. A DMARC-fail record
(`dmarc_aligned=false`) should yield `security_result.category =
AUTH_VIOLATION` — still worth confirming in the validation tool.
3. **Aggregate count** — a DMARC aggregate record summarizes `count` messages
from one source IP, not a single message. Each record becomes one
`EMAIL_TRANSACTION` with `count` carried in `additional.fields`. There is no
+116 -10
View File
@@ -49,12 +49,12 @@ filter {
# 1. UNVALIDATED. This parser was written to the docs above but has not been
# run through the SecOps parser-validation tool against a live tenant.
# Validate with the sample events in README.md before production use.
# 2. BOOLEAN COERCION. parsedmarc emits *_aligned / testing /
# normalized_timespan / sample_headers_only as JSON booleans. This parser
# assumes the json{} filter exposes them as the strings "true"/"false"
# (the CBN convention) and compares them as such. Confirm in the
# validation tool that DMARC-fail records (dmarc_aligned=false) receive
# security_result.category = AUTH_VIOLATION.
# 2. JSON TYPES ARE PRESERVED. The CBN json{} filter keeps the original JSON
# type (Google's "How to parse JSON data" shows true staying boolean and
# 199 staying integer), so parsedmarc's boolean *_aligned / testing /
# normalized_timespan and numeric count / *_count would NOT match string
# comparisons. This parser converts them to strings (step 1b) before any
# `== "true"` / `== "false"` test or %{...} use.
# 3. AGGREGATE COUNT. A DMARC aggregate record summarizes "count" messages
# from one source IP, not a single message. Each becomes one
# EMAIL_TRANSACTION with "count" carried in additional.fields; there is no
@@ -66,18 +66,103 @@ filter {
# ===========================================================================
# ---------------------------------------------------------------------------
# 1. Extract the JSON object from the (possibly syslog-framed) raw line.
# Python's SysLogHandler prepends a "<PRI>" priority (and a forwarder may
# add a timestamp/host/tag), so the JSON is not necessarily at column 0.
# Grab everything from the first "{" to the last "}".
# 1a. Initialize every field used in a conditional check to "" BEFORE the json
# filter. Chronicle's CBN raises _failed_parsing_ when an `if [field]`
# tests a field that does not exist in the log, and most of these fields
# are absent in 2 of the 3 report shapes (or null within one). The json
# filter below overwrites whichever are actually present.
# Ref: thatsiemguy "Parsing 101"; matches Google's default parsers.
# ---------------------------------------------------------------------------
mutate {
replace => {
"report_type" => ""
"event_type" => ""
# report-type detection
"feedback_type" => ""
"policy_type" => ""
"domain" => ""
# aggregate
"report_id" => ""
"org_name" => ""
"org_email" => ""
"begin_date" => ""
"end_date" => ""
"count" => ""
"p" => ""
"sp" => ""
"np" => ""
"pct" => ""
"fo" => ""
"adkim" => ""
"aspf" => ""
"testing" => ""
"discovery_method" => ""
"normalized_timespan" => ""
"dmarc_aligned" => ""
"spf_aligned" => ""
"dkim_aligned" => ""
"disposition" => ""
"dkim_domains" => ""
"dkim_selectors" => ""
"dkim_results" => ""
"spf_domains" => ""
"spf_scopes" => ""
"spf_results" => ""
"policy_override_reasons" => ""
"policy_override_comments" => ""
"source_ip_address" => ""
"source_reverse_dns" => ""
"source_country" => ""
"source_base_domain" => ""
"source_name" => ""
"source_type" => ""
"source_asn" => ""
"source_as_name" => ""
"source_as_domain" => ""
"header_from" => ""
"envelope_from" => ""
"envelope_to" => ""
# failure
"message_id" => ""
"arrival_date_utc" => ""
"arrival_date" => ""
"reported_domain" => ""
"original_mail_from" => ""
"original_rcpt_to" => ""
"subject" => ""
"delivery_result" => ""
"auth_failure" => ""
"authentication_results" => ""
"authentication_mechanisms" => ""
"user_agent" => ""
"dkim_domain" => ""
# smtp tls
"policy_domain" => ""
"receiving_ip" => ""
"sending_mta_ip" => ""
"result_type" => ""
"organization_name" => ""
"policy_strings" => ""
"mx_host_patterns" => ""
"successful_session_count" => ""
"failed_session_count" => ""
"failure_reason_code" => ""
"receiving_mx_hostname" => ""
"receiving_mx_helo" => ""
"additional_info_uri" => ""
}
}
# ---------------------------------------------------------------------------
# 1. Extract the JSON object from the (possibly syslog-framed) raw line.
# Python's SysLogHandler prepends a "<PRI>" priority (and a forwarder may
# add a timestamp/host/tag), so the JSON is not necessarily at column 0.
# Grab everything from the first "{" to the last "}".
# ---------------------------------------------------------------------------
grok {
match => {
"message" => ["^.*?(?P<payload>\\{.*\\})\\s*$"]
@@ -96,6 +181,27 @@ filter {
drop {}
}
# ---------------------------------------------------------------------------
# 1b. Convert parsedmarc's JSON booleans/numbers to strings. The json{} filter
# PRESERVES the original JSON type, so without this, [dmarc_aligned] is a
# boolean and `== "false"` never matches (and %{count} on an int is
# unreliable). Fields left as "" by step 1a convert as a harmless no-op.
# ---------------------------------------------------------------------------
mutate {
convert => {
"dmarc_aligned" => "string"
"spf_aligned" => "string"
"dkim_aligned" => "string"
"testing" => "string"
"normalized_timespan" => "string"
"count" => "string"
"source_asn" => "string"
"successful_session_count" => "string"
"failed_session_count" => "string"
}
on_error => "convert_error"
}
# ---------------------------------------------------------------------------
# 2. Detect the report type from a field that is always present, non-empty,
# and unique to each shape: