filter { # =========================================================================== # parsedmarc -> Google Security Operations (Chronicle) UDM parser # # Consumes the single-line JSON events that parsedmarc emits through its # built-in [syslog] output and maps them to the Unified Data Model (UDM). # No changes to parsedmarc are required: this is a SecOps-side custom parser # (configuration-based normalizer / CBN). # # parsedmarc emits three flat JSON shapes, one object per syslog line, via the # CSV-row serializers (parsed_aggregate/failure/smtp_tls_reports_to_csv_rows): # * DMARC aggregate report record -> detected by "xml_schema" # * DMARC failure report record -> detected by "feedback_type" # * SMTP TLS report record -> detected by "policy_type" # # --------------------------------------------------------------------------- # UDM facts this parser relies on (all from official Google documentation): # # * Output object & finalize: build fields under # event.idm.read_only_udm.* and emit with # mutate { merge => { "@output" => "event" } } # https://cloud.google.com/chronicle/docs/event-processing/parser-syntax # * Every event MUST set metadata.event_type (predefined enum) and # metadata.event_timestamp; parsing fails if a required field is missing. # A date{} filter with no target sets metadata.event_timestamp; if no # date{} runs, event_timestamp defaults to ingest time. # https://cloud.google.com/chronicle/docs/event-processing/parsing-overview # * Every event MUST contain at least one noun # (principal/target/src/intermediary/observer). principal must carry a # machine or user detail and must NOT carry email. # https://cloud.google.com/chronicle/docs/event-processing/udm-overview # * EMAIL_TRANSACTION and GENERIC_EVENT are valid metadata.event_type values. # GENERIC_EVENT only surfaces in raw-log / UDM search, not curated views. # https://cloud.google.com/chronicle/docs/investigation/udm-search # * security_result.action enum: UNKNOWN_ACTION, ALLOW, BLOCK, # ALLOW_WITH_MODIFICATION, QUARANTINE, FAIL, CHALLENGE. # security_result.category enum includes AUTH_VIOLATION # ("Authentication failed"), POLICY_VIOLATION, MAIL_SPOOFING. There is no # "passed" category, so we set a category only on failures. # https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/SecurityResult # * network.email fields: from, reply_to, to, cc, bcc, mail_id, subject. # (There is no network.email.mail_from.) # https://cloud.google.com/chronicle/docs/reference/udm-field-list # # --------------------------------------------------------------------------- # Caveats (read before trusting output): # # 1. UNVALIDATED. This parser was written to the docs above but has not been # run through the SecOps parser-validation tool against a live tenant. # Validate with the sample events in README.md before production use. # 2. JSON TYPES ARE PRESERVED. The CBN json{} filter keeps the original JSON # type (a JSON boolean stays a boolean, a number stays a number). # Booleans (*_aligned / testing / normalized_timespan) are converted to # strings (step 1b) so `== "true"` / `== "false"` tests work, and stored # as string_value (content-hub never uses bool_value). Numbers (count / # *_session_count / source_asn) are stored as number_value -- built as a # string, convert-ed to uinteger, then renamed to number_value -- so # SecOps can range-query and sort them. Matches the content-hub parsers. # 2b. CONDITIONALS. Every field tested in an `if` is initialized to "" in # step 1a, and guards use `if [field] != ""` (not bare `if [field]`, # which is true for an initialized-but-empty field). Matches content-hub. # 3. AGGREGATE COUNT. A DMARC aggregate record summarizes "count" messages # from one source IP, not a single message. Each becomes one # EMAIL_TRANSACTION with "count" carried in additional.fields; there is no # first-class per-message expansion (fanning out would be wrong). # 4. ADDRESS FORMAT. Aggregate header_from is a bare domain (DMARC only # reports the From domain). It is still mapped to network.email.from as # the best-available sender identity; downstream consumers should expect a # domain there for aggregate events, an address for failure events. # =========================================================================== # --------------------------------------------------------------------------- # 1a. Initialize every field used in a conditional check to "" BEFORE the json # filter. Chronicle's CBN raises _failed_parsing_ when an `if [field]` # tests a field that does not exist in the log, and most of these fields # are absent in 2 of the 3 report shapes (or null within one). The json # filter below overwrites whichever are actually present. # Ref: Google's official content-hub CBN parsers (e.g. CLOUDFLARE_PAGESHIELD) # initialize fields before json the same way; see also thatsiemguy "Parsing 101". # --------------------------------------------------------------------------- mutate { replace => { "report_type" => "" "event_type" => "" # report-type detection "feedback_type" => "" "policy_type" => "" "xml_schema" => "" # aggregate "domain" => "" "report_id" => "" "org_name" => "" "org_email" => "" "begin_date" => "" "end_date" => "" "count" => "" "p" => "" "sp" => "" "np" => "" "pct" => "" "fo" => "" "adkim" => "" "aspf" => "" "testing" => "" "discovery_method" => "" "normalized_timespan" => "" "dmarc_aligned" => "" "spf_aligned" => "" "dkim_aligned" => "" "disposition" => "" "dkim_domains" => "" "dkim_selectors" => "" "dkim_results" => "" "spf_domains" => "" "spf_scopes" => "" "spf_results" => "" "policy_override_reasons" => "" "policy_override_comments" => "" "source_ip_address" => "" "source_reverse_dns" => "" "source_country" => "" "source_base_domain" => "" "source_name" => "" "source_type" => "" "source_asn" => "" "source_as_name" => "" "source_as_domain" => "" "header_from" => "" "envelope_from" => "" "envelope_to" => "" # failure "message_id" => "" "arrival_date_utc" => "" "arrival_date" => "" "reported_domain" => "" "original_mail_from" => "" "original_rcpt_to" => "" "subject" => "" "delivery_result" => "" "auth_failure" => "" "authentication_results" => "" "authentication_mechanisms" => "" "user_agent" => "" "dkim_domain" => "" # smtp tls "policy_domain" => "" "receiving_ip" => "" "sending_mta_ip" => "" "result_type" => "" "organization_name" => "" "policy_strings" => "" "mx_host_patterns" => "" "successful_session_count" => "" "failed_session_count" => "" "failure_reason_code" => "" "receiving_mx_hostname" => "" "receiving_mx_helo" => "" "additional_info_uri" => "" } } # --------------------------------------------------------------------------- # 1. Extract the JSON object from the (possibly syslog-framed) raw line. # Python's SysLogHandler prepends a "" priority (and a forwarder may # add a timestamp/host/tag), so the JSON is not necessarily at column 0. # Grab everything from the first "{" to the last "}". # --------------------------------------------------------------------------- grok { match => { "message" => ["^.*?(?P\\{.*\\})\\s*$"] } on_error => "no_json_payload" } if [no_json_payload] { drop { tag => "TAG_MALFORMED_ENCODING" } } json { source => "payload" on_error => "not_json" } if [not_json] { drop { tag => "TAG_MALFORMED_ENCODING" } } # --------------------------------------------------------------------------- # 1b. Convert parsedmarc's JSON booleans to strings so they can be compared. # The json{} filter PRESERVES the original JSON type, so without this # [dmarc_aligned] is a boolean and `== "false"` never matches. Booleans are # stored as string_value (matching Google's content-hub parsers, which # never use bool_value). Numeric fields are deliberately NOT converted # here -- they are stored as number_value (not string) in additional.fields # so SecOps can range-query and sort them; see the count / *_session_count # / source_asn blocks below. # --------------------------------------------------------------------------- mutate { convert => { "dmarc_aligned" => "string" "spf_aligned" => "string" "dkim_aligned" => "string" "testing" => "string" "normalized_timespan" => "string" } on_error => "convert_error" } # --------------------------------------------------------------------------- # 2. Detect the report type from a field that is always present, non-empty, # and unique to each shape: # feedback_type -> failure # policy_type -> smtp_tls # xml_schema -> aggregate # xml_schema is aggregate-only and parsedmarc defaults it to "draft" when # the report omits (parsedmarc/__init__.py), so it survives a # missing version. It is preferred over: header_from (can be empty when a # record carries no identifiers), adkim (a defaulted policy field), domain # (a generic name), and dmarc_aligned (a boolean that only becomes testable # after the convert in step 1b -- detection should not depend on that). # --------------------------------------------------------------------------- if [feedback_type] != "" { mutate { replace => { "report_type" => "failure" } } } else if [policy_type] != "" { mutate { replace => { "report_type" => "smtp_tls" } } } else if [xml_schema] != "" { mutate { replace => { "report_type" => "aggregate" } } } # Not a parsedmarc record we recognize: drop rather than emit an invalid event. if [report_type] == "" { drop { tag => "TAG_UNSUPPORTED" } } # =========================================================================== # DMARC AGGREGATE -> EMAIL_TRANSACTION # =========================================================================== if [report_type] == "aggregate" { mutate { replace => { "event_type" => "EMAIL_TRANSACTION" } } if [report_id] != "" { mutate { replace => { "event.idm.read_only_udm.metadata.product_log_id" => "%{report_id}" } } } # -- event timestamp: start of the record's reporting interval (UTC). -- if [begin_date] != "" { date { match => ["begin_date", "yyyy-MM-dd HH:mm:ss"] timezone => "UTC" on_error => "agg_date_error" } } # -- principal: the sending source (machine details only). -- if [source_ip_address] != "" { mutate { merge => { "event.idm.read_only_udm.principal.ip" => "source_ip_address" } on_error => "agg_src_ip_error" } } if [source_reverse_dns] != "" { mutate { replace => { "event.idm.read_only_udm.principal.hostname" => "%{source_reverse_dns}" } } } if [source_country] != "" { mutate { replace => { "event.idm.read_only_udm.principal.location.country_or_region" => "%{source_country}" } } } # -- target: the domain the report is about. -- if [domain] != "" { mutate { replace => { "event.idm.read_only_udm.target.hostname" => "%{domain}" } } } # -- email: aggregate only carries the From domain (see caveat 4). -- if [header_from] != "" { mutate { replace => { "event.idm.read_only_udm.network.email.from" => "%{header_from}" } } } # -- security_result: disposition -> action; failed alignment -> category. -- mutate { replace => { "sr.summary" => "DMARC aggregate report" } } if [disposition] == "reject" { mutate { replace => { "sr.action" => "BLOCK" } } } else if [disposition] == "quarantine" { mutate { replace => { "sr.action" => "QUARANTINE" } } } else if [disposition] == "none" { mutate { replace => { "sr.action" => "ALLOW" } } } else { mutate { replace => { "sr.action" => "UNKNOWN_ACTION" } } } # DMARC failed authentication alignment -> AUTH_VIOLATION (see caveat 2). if [dmarc_aligned] == "false" { mutate { replace => { "sr.category" => "AUTH_VIOLATION" } } } mutate { replace => { "sr.description" => "dmarc_aligned=%{dmarc_aligned} spf_aligned=%{spf_aligned} dkim_aligned=%{dkim_aligned} disposition=%{disposition}" } on_error => "agg_sr_desc_error" } mutate { merge => { "event.idm.read_only_udm.security_result" => "sr" } } # -- additional.fields: DMARC dimensions a dashboard would filter on. -- if [org_name] != "" { mutate { replace => { "f_org_name.key" => "org_name" "f_org_name.value.string_value" => "%{org_name}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_org_name" } } } if [org_email] != "" { mutate { replace => { "f_org_email.key" => "org_email" "f_org_email.value.string_value" => "%{org_email}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_org_email" } } } if [begin_date] != "" { mutate { replace => { "f_begin.key" => "begin_date" "f_begin.value.string_value" => "%{begin_date}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_begin" } } } if [end_date] != "" { mutate { replace => { "f_end.key" => "end_date" "f_end.value.string_value" => "%{end_date}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_end" } } } mutate { replace => { "f_count.key" => "count" "f_count.value.string_value" => "%{count}" } } mutate { convert => { "f_count.value.string_value" => "uinteger" } on_error => "count_nan" } if ![count_nan] { mutate { rename => { "f_count.value.string_value" => "f_count.value.number_value" } } mutate { merge => { "event.idm.read_only_udm.additional.fields" => "f_count" } } } if [p] != "" { mutate { replace => { "f_p.key" => "dmarc_policy" "f_p.value.string_value" => "%{p}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_p" } } } if [sp] != "" { mutate { replace => { "f_sp.key" => "dmarc_subdomain_policy" "f_sp.value.string_value" => "%{sp}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_sp" } } } if [np] != "" { mutate { replace => { "f_np.key" => "dmarc_np_policy" "f_np.value.string_value" => "%{np}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_np" } } } if [pct] != "" { mutate { replace => { "f_pct.key" => "dmarc_pct" "f_pct.value.string_value" => "%{pct}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_pct" } } } if [fo] != "" { mutate { replace => { "f_fo.key" => "dmarc_fo" "f_fo.value.string_value" => "%{fo}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_fo" } } } if [adkim] != "" { mutate { replace => { "f_adkim.key" => "dkim_alignment_mode" "f_adkim.value.string_value" => "%{adkim}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_adkim" } } } if [aspf] != "" { mutate { replace => { "f_aspf.key" => "spf_alignment_mode" "f_aspf.value.string_value" => "%{aspf}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_aspf" } } } if [testing] == "true" or [testing] == "false" { mutate { replace => { "f_testing.key" => "dmarc_testing" "f_testing.value.string_value" => "%{testing}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_testing" } } } if [discovery_method] != "" { mutate { replace => { "f_disc.key" => "discovery_method" "f_disc.value.string_value" => "%{discovery_method}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_disc" } } } if [normalized_timespan] == "true" or [normalized_timespan] == "false" { mutate { replace => { "f_norm.key" => "normalized_timespan" "f_norm.value.string_value" => "%{normalized_timespan}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_norm" } } } # Alignment outcomes (security-relevant; emit for both true and false). if [dmarc_aligned] == "true" or [dmarc_aligned] == "false" { mutate { replace => { "f_dmarc.key" => "dmarc_aligned" "f_dmarc.value.string_value" => "%{dmarc_aligned}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_dmarc" } } } if [spf_aligned] == "true" or [spf_aligned] == "false" { mutate { replace => { "f_spfa.key" => "spf_aligned" "f_spfa.value.string_value" => "%{spf_aligned}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_spfa" } } } if [dkim_aligned] == "true" or [dkim_aligned] == "false" { mutate { replace => { "f_dkima.key" => "dkim_aligned" "f_dkima.value.string_value" => "%{dkim_aligned}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_dkima" } } } if [disposition] != "" { mutate { replace => { "f_disp.key" => "disposition" "f_disp.value.string_value" => "%{disposition}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_disp" } } } # DKIM / SPF auth detail (comma-joined by parsedmarc). if [dkim_domains] != "" { mutate { replace => { "f_dkd.key" => "dkim_domains" "f_dkd.value.string_value" => "%{dkim_domains}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_dkd" } } } if [dkim_selectors] != "" { mutate { replace => { "f_dks.key" => "dkim_selectors" "f_dks.value.string_value" => "%{dkim_selectors}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_dks" } } } if [dkim_results] != "" { mutate { replace => { "f_dkr.key" => "dkim_results" "f_dkr.value.string_value" => "%{dkim_results}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_dkr" } } } if [spf_domains] != "" { mutate { replace => { "f_spd.key" => "spf_domains" "f_spd.value.string_value" => "%{spf_domains}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_spd" } } } if [spf_scopes] != "" { mutate { replace => { "f_sps.key" => "spf_scopes" "f_sps.value.string_value" => "%{spf_scopes}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_sps" } } } if [spf_results] != "" { mutate { replace => { "f_spr.key" => "spf_results" "f_spr.value.string_value" => "%{spf_results}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_spr" } } } # Policy overrides (parsedmarc writes the literal "none" when there are none). if [policy_override_reasons] != "" and [policy_override_reasons] != "none" { mutate { replace => { "f_por.key" => "policy_override_reasons" "f_por.value.string_value" => "%{policy_override_reasons}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_por" } } } if [policy_override_comments] != "" and [policy_override_comments] != "none" { mutate { replace => { "f_poc.key" => "policy_override_comments" "f_poc.value.string_value" => "%{policy_override_comments}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_poc" } } } # Source enrichment from parsedmarc's reverse-DNS / MMDB maps. if [source_base_domain] != "" { mutate { replace => { "f_sbd.key" => "source_base_domain" "f_sbd.value.string_value" => "%{source_base_domain}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_sbd" } } } if [source_name] != "" { mutate { replace => { "f_sn.key" => "source_name" "f_sn.value.string_value" => "%{source_name}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_sn" } } } if [source_type] != "" { mutate { replace => { "f_st.key" => "source_type" "f_st.value.string_value" => "%{source_type}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_st" } } } mutate { replace => { "f_asn.key" => "source_asn" "f_asn.value.string_value" => "%{source_asn}" } } mutate { convert => { "f_asn.value.string_value" => "uinteger" } on_error => "asn_nan" } if ![asn_nan] { mutate { rename => { "f_asn.value.string_value" => "f_asn.value.number_value" } } mutate { merge => { "event.idm.read_only_udm.additional.fields" => "f_asn" } } } if [source_as_name] != "" { mutate { replace => { "f_asnm.key" => "source_as_name" "f_asnm.value.string_value" => "%{source_as_name}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_asnm" } } } if [source_as_domain] != "" { mutate { replace => { "f_asd.key" => "source_as_domain" "f_asd.value.string_value" => "%{source_as_domain}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_asd" } } } # Envelope identifiers (usually empty in aggregate reports). if [envelope_from] != "" { mutate { replace => { "f_ef.key" => "envelope_from" "f_ef.value.string_value" => "%{envelope_from}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_ef" } } } if [envelope_to] != "" { mutate { replace => { "f_et.key" => "envelope_to" "f_et.value.string_value" => "%{envelope_to}" } merge => { "event.idm.read_only_udm.additional.fields" => "f_et" } } } } # =========================================================================== # DMARC FAILURE -> EMAIL_TRANSACTION # =========================================================================== if [report_type] == "failure" { mutate { replace => { "event_type" => "EMAIL_TRANSACTION" } } if [message_id] != "" { mutate { replace => { "event.idm.read_only_udm.metadata.product_log_id" => "%{message_id}" } } } # -- event timestamp: message arrival time (UTC). -- if [arrival_date_utc] != "" { date { match => ["arrival_date_utc", "yyyy-MM-dd HH:mm:ss"] timezone => "UTC" on_error => "fail_date_error" } } # -- principal: the sending source. -- if [source_ip_address] != "" { mutate { merge => { "event.idm.read_only_udm.principal.ip" => "source_ip_address" } on_error => "fail_src_ip_error" } } if [source_reverse_dns] != "" { mutate { replace => { "event.idm.read_only_udm.principal.hostname" => "%{source_reverse_dns}" } } } if [source_country] != "" { mutate { replace => { "event.idm.read_only_udm.principal.location.country_or_region" => "%{source_country}" } } } # -- target: the reported (claimed) domain. -- if [reported_domain] != "" { mutate { replace => { "event.idm.read_only_udm.target.hostname" => "%{reported_domain}" } } } # -- email: failure reports carry full addresses + subject. -- if [original_mail_from] != "" { mutate { replace => { "event.idm.read_only_udm.network.email.from" => "%{original_mail_from}" } } } if [original_rcpt_to] != "" { mutate { replace => { "event.idm.read_only_udm.network.email.to" => "%{original_rcpt_to}" } on_error => "fail_rcpt_error" } } if [subject] != "" { mutate { replace => { "event.idm.read_only_udm.network.email.subject" => "%{subject}" } } } if [message_id] != "" { mutate { replace => { "event.idm.read_only_udm.network.email.mail_id" => "%{message_id}" } } } # -- security_result: a failure report is an authentication failure. -- mutate { replace => { "srf.summary" => "DMARC failure report" "srf.category" => "AUTH_VIOLATION" } } if [delivery_result] == "reject" { mutate { replace => { "srf.action" => "BLOCK" } } } else if [delivery_result] == "quarantine" { mutate { replace => { "srf.action" => "QUARANTINE" } } } else if [delivery_result] == "delivered" { mutate { replace => { "srf.action" => "ALLOW" } } } else { mutate { replace => { "srf.action" => "UNKNOWN_ACTION" } } } if [auth_failure] != "" { mutate { replace => { "srf.description" => "auth_failure=%{auth_failure} delivery_result=%{delivery_result}" } on_error => "fail_sr_desc_error" } } mutate { merge => { "event.idm.read_only_udm.security_result" => "srf" } } # -- additional.fields. -- if [feedback_type] != "" { mutate { replace => { "g_fb.key" => "feedback_type" "g_fb.value.string_value" => "%{feedback_type}" } merge => { "event.idm.read_only_udm.additional.fields" => "g_fb" } } } if [auth_failure] != "" { mutate { replace => { "g_af.key" => "auth_failure" "g_af.value.string_value" => "%{auth_failure}" } merge => { "event.idm.read_only_udm.additional.fields" => "g_af" } } } if [delivery_result] != "" { mutate { replace => { "g_dr.key" => "delivery_result" "g_dr.value.string_value" => "%{delivery_result}" } merge => { "event.idm.read_only_udm.additional.fields" => "g_dr" } } } if [authentication_results] != "" { mutate { replace => { "g_ar.key" => "authentication_results" "g_ar.value.string_value" => "%{authentication_results}" } merge => { "event.idm.read_only_udm.additional.fields" => "g_ar" } } } if [authentication_mechanisms] != "" { mutate { replace => { "g_am.key" => "authentication_mechanisms" "g_am.value.string_value" => "%{authentication_mechanisms}" } merge => { "event.idm.read_only_udm.additional.fields" => "g_am" } } } if [user_agent] != "" { mutate { replace => { "g_ua.key" => "user_agent" "g_ua.value.string_value" => "%{user_agent}" } merge => { "event.idm.read_only_udm.additional.fields" => "g_ua" } } } if [dkim_domain] != "" { mutate { replace => { "g_dd.key" => "dkim_domain" "g_dd.value.string_value" => "%{dkim_domain}" } merge => { "event.idm.read_only_udm.additional.fields" => "g_dd" } } } if [arrival_date] != "" { mutate { replace => { "g_ad.key" => "arrival_date" "g_ad.value.string_value" => "%{arrival_date}" } merge => { "event.idm.read_only_udm.additional.fields" => "g_ad" } } } } # =========================================================================== # SMTP TLS -> GENERIC_EVENT # No per-message semantics; modeled as a generic event whose noun is the # reported policy domain (target.hostname), which is always present. Per the # docs, GENERIC_EVENT only appears in raw-log / UDM search, not curated views. # =========================================================================== if [report_type] == "smtp_tls" { mutate { replace => { "event_type" => "GENERIC_EVENT" } } if [report_id] != "" { mutate { replace => { "event.idm.read_only_udm.metadata.product_log_id" => "%{report_id}" } } } # -- event timestamp: start of the report window. SMTP TLS uses ISO 8601 # with a trailing Z (e.g. 2025-12-07T19:00:00Z), unlike the other types. -- if [begin_date] != "" { date { match => ["begin_date", "yyyy-MM-dd'T'HH:mm:ss'Z'"] timezone => "UTC" on_error => "tls_date_error" } } # -- target: the reported policy domain (always present -> the noun). -- if [policy_domain] != "" { mutate { replace => { "event.idm.read_only_udm.target.hostname" => "%{policy_domain}" } } } # receiving MTA IP, when a per-session failure row carries it. if [receiving_ip] != "" { mutate { merge => { "event.idm.read_only_udm.target.ip" => "receiving_ip" } on_error => "tls_rcv_ip_error" } } # sending MTA IP, when present, is the principal. if [sending_mta_ip] != "" { mutate { merge => { "event.idm.read_only_udm.principal.ip" => "sending_mta_ip" } on_error => "tls_snd_ip_error" } } # -- security_result only on failures (result_type present). -- if [result_type] != "" { mutate { replace => { "srt.summary" => "SMTP TLS report failure" "srt.category" => "POLICY_VIOLATION" "srt.action" => "FAIL" "srt.description" => "result_type=%{result_type}" } on_error => "tls_sr_error" } mutate { merge => { "event.idm.read_only_udm.security_result" => "srt" } } } # -- additional.fields. -- if [organization_name] != "" { mutate { replace => { "t_org.key" => "organization_name" "t_org.value.string_value" => "%{organization_name}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_org" } } } if [begin_date] != "" { mutate { replace => { "t_begin.key" => "begin_date" "t_begin.value.string_value" => "%{begin_date}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_begin" } } } if [end_date] != "" { mutate { replace => { "t_end.key" => "end_date" "t_end.value.string_value" => "%{end_date}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_end" } } } if [policy_domain] != "" { mutate { replace => { "t_pd.key" => "policy_domain" "t_pd.value.string_value" => "%{policy_domain}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_pd" } } } if [policy_type] != "" { mutate { replace => { "t_pt.key" => "policy_type" "t_pt.value.string_value" => "%{policy_type}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_pt" } } } if [policy_strings] != "" { mutate { replace => { "t_ps.key" => "policy_strings" "t_ps.value.string_value" => "%{policy_strings}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_ps" } } } if [mx_host_patterns] != "" { mutate { replace => { "t_mx.key" => "mx_host_patterns" "t_mx.value.string_value" => "%{mx_host_patterns}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_mx" } } } mutate { replace => { "t_ssc.key" => "successful_session_count" "t_ssc.value.string_value" => "%{successful_session_count}" } } mutate { convert => { "t_ssc.value.string_value" => "uinteger" } on_error => "ssc_nan" } if ![ssc_nan] { mutate { rename => { "t_ssc.value.string_value" => "t_ssc.value.number_value" } } mutate { merge => { "event.idm.read_only_udm.additional.fields" => "t_ssc" } } } mutate { replace => { "t_fsc.key" => "failed_session_count" "t_fsc.value.string_value" => "%{failed_session_count}" } } mutate { convert => { "t_fsc.value.string_value" => "uinteger" } on_error => "fsc_nan" } if ![fsc_nan] { mutate { rename => { "t_fsc.value.string_value" => "t_fsc.value.number_value" } } mutate { merge => { "event.idm.read_only_udm.additional.fields" => "t_fsc" } } } if [result_type] != "" { mutate { replace => { "t_rt.key" => "result_type" "t_rt.value.string_value" => "%{result_type}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_rt" } } } if [failure_reason_code] != "" { mutate { replace => { "t_frc.key" => "failure_reason_code" "t_frc.value.string_value" => "%{failure_reason_code}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_frc" } } } if [receiving_mx_hostname] != "" { mutate { replace => { "t_rmh.key" => "receiving_mx_hostname" "t_rmh.value.string_value" => "%{receiving_mx_hostname}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_rmh" } } } if [receiving_mx_helo] != "" { mutate { replace => { "t_rmhelo.key" => "receiving_mx_helo" "t_rmhelo.value.string_value" => "%{receiving_mx_helo}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_rmhelo" } } } if [additional_info_uri] != "" { mutate { replace => { "t_aiu.key" => "additional_info_uri" "t_aiu.value.string_value" => "%{additional_info_uri}" } merge => { "event.idm.read_only_udm.additional.fields" => "t_aiu" } } } } # =========================================================================== # 3. Common metadata + finalize. # =========================================================================== mutate { replace => { "event.idm.read_only_udm.metadata.event_type" => "%{event_type}" "event.idm.read_only_udm.metadata.vendor_name" => "parsedmarc" "event.idm.read_only_udm.metadata.product_name" => "parsedmarc" } } if [report_type] != "" { mutate { replace => { "event.idm.read_only_udm.metadata.product_event_type" => "%{report_type}" } } } mutate { merge => { "@output" => "event" } } }