fixing ES/OS forensic report lookup and storage, extracting ES to separate CI service (#603)

* fixing ES/OS forensic report lookup and storage, extracting ES to separate CI service

* bumping CI ES version to current latest

* reshuffling CI job attributes

* removing EOL Python 3.8 from the CI pipeline
This commit is contained in:
Szasza Palmer
2025-06-03 01:10:10 +10:00
committed by GitHub
parent 4c04418dae
commit e299f7d161
3 changed files with 69 additions and 35 deletions
+15 -9
View File
@@ -11,13 +11,26 @@ on:
jobs:
build:
runs-on: ubuntu-latest
services:
elasticsearch:
image: elasticsearch:8.18.2
env:
discovery.type: single-node
cluster.name: parsedmarc-cluster
discovery.seed_hosts: elasticsearch
bootstrap.memory_lock: true
xpack.security.enabled: false
xpack.license.self_generated.type: basic
ports:
- 9200:9200
- 9300:9300
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
@@ -29,13 +42,6 @@ jobs:
run: |
sudo apt-get update
sudo apt-get install -y libemail-outlook-message-perl
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
sudo apt-get install apt-transport-https
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update && sudo apt-get install elasticsearch
sudo sed -i 's/xpack.security.enabled: true/xpack.security.enabled: false/' /etc/elasticsearch/elasticsearch.yml
sudo systemctl restart elasticsearch
sudo systemctl --no-pager status elasticsearch
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
+27 -14
View File
@@ -552,8 +552,8 @@ def save_forensic_report_to_elasticsearch(
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date_human = forensic_report["arrival_date_utc"]
arrival_date = human_timestamp_to_datetime(arrival_date_human)
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_forensic_{0}*".format(index_suffix)
@@ -562,28 +562,41 @@ def save_forensic_report_to_elasticsearch(
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
arrival_query = {"match": {"arrival_date": arrival_date}}
q = Q(arrival_query)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
from_ = None
to_ = None
subject = None
if "from" in headers:
from_ = headers["from"]
from_query = {"match_phrase": {"sample.headers.from": from_}}
q = q & Q(from_query)
# We convert the FROM header from a string list to a flat string.
headers["from"] = headers["from"][0]
if headers["from"][0] == "":
headers["from"] = headers["from"][1]
else:
headers["from"] = " <".join(headers["from"]) + ">"
from_ = dict()
from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_))
q = q & from_query
if "to" in headers:
to_ = headers["to"]
to_query = {"match_phrase": {"sample.headers.to": to_}}
q = q & Q(to_query)
# We convert the TO header from a string list to a flat string.
headers["to"] = headers["to"][0]
if headers["to"][0] == "":
headers["to"] = headers["to"][1]
else:
headers["to"] = " <".join(headers["to"]) + ">"
to_ = dict()
to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_))
q = q & to_query
if "subject" in headers:
subject = headers["subject"]
subject_query = {"match_phrase": {"sample.headers.subject": subject}}
q = q & Q(subject_query)
search.query = q
print(search.__str__())
existing = search.execute()
if len(existing) > 0:
@@ -591,7 +604,7 @@ def save_forensic_report_to_elasticsearch(
"A forensic sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"Elasticsearch".format(to_, from_, subject, arrival_date_human)
"Elasticsearch".format(to_, from_, subject, forensic_report["arrival_date_utc"])
)
parsed_sample = forensic_report["parsed_sample"]
@@ -627,7 +640,7 @@ def save_forensic_report_to_elasticsearch(
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
arrival_date=arrival_date,
arrival_date=arrival_date_epoch_milliseconds,
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
+27 -12
View File
@@ -552,8 +552,8 @@ def save_forensic_report_to_opensearch(
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date_human = forensic_report["arrival_date_utc"]
arrival_date = human_timestamp_to_datetime(arrival_date_human)
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_forensic_{0}*".format(index_suffix)
@@ -562,20 +562,35 @@ def save_forensic_report_to_opensearch(
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
arrival_query = {"match": {"arrival_date": arrival_date}}
q = Q(arrival_query)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
from_ = None
to_ = None
subject = None
if "from" in headers:
from_ = headers["from"]
from_query = {"match_phrase": {"sample.headers.from": from_}}
q = q & Q(from_query)
# We convert the FROM header from a string list to a flat string.
headers["from"] = headers["from"][0]
if headers["from"][0] == "":
headers["from"] = headers["from"][1]
else:
headers["from"] = " <".join(headers["from"]) + ">"
from_ = dict()
from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_))
q = q & from_query
if "to" in headers:
to_ = headers["to"]
to_query = {"match_phrase": {"sample.headers.to": to_}}
q = q & Q(to_query)
# We convert the TO header from a string list to a flat string.
headers["to"] = headers["to"][0]
if headers["to"][0] == "":
headers["to"] = headers["to"][1]
else:
headers["to"] = " <".join(headers["to"]) + ">"
to_ = dict()
to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_))
q = q & to_query
if "subject" in headers:
subject = headers["subject"]
subject_query = {"match_phrase": {"sample.headers.subject": subject}}
@@ -589,7 +604,7 @@ def save_forensic_report_to_opensearch(
"A forensic sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"OpenSearch".format(to_, from_, subject, arrival_date_human)
"OpenSearch".format(to_, from_, subject, forensic_report["arrival_date_utc"])
)
parsed_sample = forensic_report["parsed_sample"]
@@ -625,7 +640,7 @@ def save_forensic_report_to_opensearch(
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
arrival_date=arrival_date,
arrival_date=arrival_date_epoch_milliseconds,
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],