From 2d9a2a2a8fe63d0683c97fcf49e194be1eebaa93 Mon Sep 17 00:00:00 2001 From: Sean Whalen <44679+seanthegeek@users.noreply.github.com> Date: Thu, 4 Jun 2026 10:22:02 -0400 Subject: [PATCH] Fix JSON type handling and pre-json field init in SecOps parser Two CBN behaviors, confirmed against Google's own "How to parse JSON data" guide (statedump shows JSON true/199 retaining boolean/integer type) and the published Corelight production parser: 1. The json{} filter preserves the original JSON type, so parsedmarc's boolean *_aligned / testing / normalized_timespan and numeric count / *_session_count / source_asn would never match string comparisons. Add a mutate{convert} step turning them into strings before any == "true"/"false" test or %{...} use. 2. CBN raises _failed_parsing_ when an `if [field]` references a field absent from the log, and most detection/mapping fields are absent in 2 of the 3 report shapes (or null within one). Initialize every conditionally-checked field to "" before the json{} filter. Without these, DMARC-fail records would not be categorized AUTH_VIOLATION and aggregate/TLS reports could fail parsing outright. README caveat and PR validation steps updated accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) --- google_secops_parser/README.md | 18 ++-- google_secops_parser/parsedmarc.conf | 126 ++++++++++++++++++++++++--- 2 files changed, 127 insertions(+), 17 deletions(-) diff --git a/google_secops_parser/README.md b/google_secops_parser/README.md index 1f4d218..ba403f7 100644 --- a/google_secops_parser/README.md +++ b/google_secops_parser/README.md @@ -40,13 +40,17 @@ DMARC types. ## Caveats 1. **Unvalidated** — see [Status](#status). -2. **Boolean coercion** — parsedmarc emits `dmarc_aligned`, `spf_aligned`, - `dkim_aligned`, `testing`, and `normalized_timespan` as JSON booleans. The - parser assumes the `json{}` filter exposes them as the strings `"true"` / - `"false"` (the CBN convention) and compares them as such. The security- - relevant consequence to confirm in the validation tool: a DMARC-fail record - (`dmarc_aligned=false`) must receive `security_result.category = - AUTH_VIOLATION`. +2. **JSON type handling** — parsedmarc emits `dmarc_aligned` / `spf_aligned` / + `dkim_aligned` / `testing` / `normalized_timespan` as JSON booleans and + `count` / `*_session_count` / `source_asn` as numbers. Chronicle's `json{}` + filter **preserves the original JSON type**, so the parser explicitly + converts these to strings (`mutate { convert => { … => "string" } }`) before + any comparison — otherwise `[dmarc_aligned] == "false"` would never match. + Relatedly, every field tested in an `if` is initialized to `""` *before* the + `json` filter, because CBN raises `_failed_parsing_` on a conditional that + references a field absent from the log. A DMARC-fail record + (`dmarc_aligned=false`) should yield `security_result.category = + AUTH_VIOLATION` — still worth confirming in the validation tool. 3. **Aggregate count** — a DMARC aggregate record summarizes `count` messages from one source IP, not a single message. Each record becomes one `EMAIL_TRANSACTION` with `count` carried in `additional.fields`. There is no diff --git a/google_secops_parser/parsedmarc.conf b/google_secops_parser/parsedmarc.conf index 43f60e1..a3f4b37 100644 --- a/google_secops_parser/parsedmarc.conf +++ b/google_secops_parser/parsedmarc.conf @@ -49,12 +49,12 @@ filter { # 1. UNVALIDATED. This parser was written to the docs above but has not been # run through the SecOps parser-validation tool against a live tenant. # Validate with the sample events in README.md before production use. - # 2. BOOLEAN COERCION. parsedmarc emits *_aligned / testing / - # normalized_timespan / sample_headers_only as JSON booleans. This parser - # assumes the json{} filter exposes them as the strings "true"/"false" - # (the CBN convention) and compares them as such. Confirm in the - # validation tool that DMARC-fail records (dmarc_aligned=false) receive - # security_result.category = AUTH_VIOLATION. + # 2. JSON TYPES ARE PRESERVED. The CBN json{} filter keeps the original JSON + # type (Google's "How to parse JSON data" shows true staying boolean and + # 199 staying integer), so parsedmarc's boolean *_aligned / testing / + # normalized_timespan and numeric count / *_count would NOT match string + # comparisons. This parser converts them to strings (step 1b) before any + # `== "true"` / `== "false"` test or %{...} use. # 3. AGGREGATE COUNT. A DMARC aggregate record summarizes "count" messages # from one source IP, not a single message. Each becomes one # EMAIL_TRANSACTION with "count" carried in additional.fields; there is no @@ -66,18 +66,103 @@ filter { # =========================================================================== # --------------------------------------------------------------------------- - # 1. Extract the JSON object from the (possibly syslog-framed) raw line. - # Python's SysLogHandler prepends a "" priority (and a forwarder may - # add a timestamp/host/tag), so the JSON is not necessarily at column 0. - # Grab everything from the first "{" to the last "}". + # 1a. Initialize every field used in a conditional check to "" BEFORE the json + # filter. Chronicle's CBN raises _failed_parsing_ when an `if [field]` + # tests a field that does not exist in the log, and most of these fields + # are absent in 2 of the 3 report shapes (or null within one). The json + # filter below overwrites whichever are actually present. + # Ref: thatsiemguy "Parsing 101"; matches Google's default parsers. # --------------------------------------------------------------------------- mutate { replace => { "report_type" => "" "event_type" => "" + + # report-type detection + "feedback_type" => "" + "policy_type" => "" + "domain" => "" + + # aggregate + "report_id" => "" + "org_name" => "" + "org_email" => "" + "begin_date" => "" + "end_date" => "" + "count" => "" + "p" => "" + "sp" => "" + "np" => "" + "pct" => "" + "fo" => "" + "adkim" => "" + "aspf" => "" + "testing" => "" + "discovery_method" => "" + "normalized_timespan" => "" + "dmarc_aligned" => "" + "spf_aligned" => "" + "dkim_aligned" => "" + "disposition" => "" + "dkim_domains" => "" + "dkim_selectors" => "" + "dkim_results" => "" + "spf_domains" => "" + "spf_scopes" => "" + "spf_results" => "" + "policy_override_reasons" => "" + "policy_override_comments" => "" + "source_ip_address" => "" + "source_reverse_dns" => "" + "source_country" => "" + "source_base_domain" => "" + "source_name" => "" + "source_type" => "" + "source_asn" => "" + "source_as_name" => "" + "source_as_domain" => "" + "header_from" => "" + "envelope_from" => "" + "envelope_to" => "" + + # failure + "message_id" => "" + "arrival_date_utc" => "" + "arrival_date" => "" + "reported_domain" => "" + "original_mail_from" => "" + "original_rcpt_to" => "" + "subject" => "" + "delivery_result" => "" + "auth_failure" => "" + "authentication_results" => "" + "authentication_mechanisms" => "" + "user_agent" => "" + "dkim_domain" => "" + + # smtp tls + "policy_domain" => "" + "receiving_ip" => "" + "sending_mta_ip" => "" + "result_type" => "" + "organization_name" => "" + "policy_strings" => "" + "mx_host_patterns" => "" + "successful_session_count" => "" + "failed_session_count" => "" + "failure_reason_code" => "" + "receiving_mx_hostname" => "" + "receiving_mx_helo" => "" + "additional_info_uri" => "" } } + # --------------------------------------------------------------------------- + # 1. Extract the JSON object from the (possibly syslog-framed) raw line. + # Python's SysLogHandler prepends a "" priority (and a forwarder may + # add a timestamp/host/tag), so the JSON is not necessarily at column 0. + # Grab everything from the first "{" to the last "}". + # --------------------------------------------------------------------------- grok { match => { "message" => ["^.*?(?P\\{.*\\})\\s*$"] @@ -96,6 +181,27 @@ filter { drop {} } + # --------------------------------------------------------------------------- + # 1b. Convert parsedmarc's JSON booleans/numbers to strings. The json{} filter + # PRESERVES the original JSON type, so without this, [dmarc_aligned] is a + # boolean and `== "false"` never matches (and %{count} on an int is + # unreliable). Fields left as "" by step 1a convert as a harmless no-op. + # --------------------------------------------------------------------------- + mutate { + convert => { + "dmarc_aligned" => "string" + "spf_aligned" => "string" + "dkim_aligned" => "string" + "testing" => "string" + "normalized_timespan" => "string" + "count" => "string" + "source_asn" => "string" + "successful_session_count" => "string" + "failed_session_count" => "string" + } + on_error => "convert_error" + } + # --------------------------------------------------------------------------- # 2. Detect the report type from a field that is always present, non-empty, # and unique to each shape: