diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index 1fbd216..578208f 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -11,13 +11,26 @@ on: jobs: build: - runs-on: ubuntu-latest + services: + elasticsearch: + image: elasticsearch:8.18.2 + env: + discovery.type: single-node + cluster.name: parsedmarc-cluster + discovery.seed_hosts: elasticsearch + bootstrap.memory_lock: true + xpack.security.enabled: false + xpack.license.self_generated.type: basic + ports: + - 9200:9200 + - 9300:9300 + strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v4 @@ -29,13 +42,6 @@ jobs: run: | sudo apt-get update sudo apt-get install -y libemail-outlook-message-perl - wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg - sudo apt-get install apt-transport-https - echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list - sudo apt-get update && sudo apt-get install elasticsearch - sudo sed -i 's/xpack.security.enabled: true/xpack.security.enabled: false/' /etc/elasticsearch/elasticsearch.yml - sudo systemctl restart elasticsearch - sudo systemctl --no-pager status elasticsearch - name: Install Python dependencies run: | python -m pip install --upgrade pip diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 9835117..8bc0171 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -552,8 +552,8 @@ def save_forensic_report_to_elasticsearch( for original_header in original_headers: headers[original_header.lower()] = original_headers[original_header] - arrival_date_human = forensic_report["arrival_date_utc"] - arrival_date = human_timestamp_to_datetime(arrival_date_human) + arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"]) + arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000) if index_suffix is not None: search_index = "dmarc_forensic_{0}*".format(index_suffix) @@ -562,28 +562,41 @@ def save_forensic_report_to_elasticsearch( if index_prefix is not None: search_index = "{0}{1}".format(index_prefix, search_index) search = Search(index=search_index) - arrival_query = {"match": {"arrival_date": arrival_date}} - q = Q(arrival_query) + q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) from_ = None to_ = None subject = None if "from" in headers: - from_ = headers["from"] - from_query = {"match_phrase": {"sample.headers.from": from_}} - q = q & Q(from_query) + # We convert the FROM header from a string list to a flat string. + headers["from"] = headers["from"][0] + if headers["from"][0] == "": + headers["from"] = headers["from"][1] + else: + headers["from"] = " <".join(headers["from"]) + ">" + + from_ = dict() + from_["sample.headers.from"] = headers["from"] + from_query = Q(dict(match_phrase=from_)) + q = q & from_query if "to" in headers: - to_ = headers["to"] - to_query = {"match_phrase": {"sample.headers.to": to_}} - q = q & Q(to_query) + # We convert the TO header from a string list to a flat string. + headers["to"] = headers["to"][0] + if headers["to"][0] == "": + headers["to"] = headers["to"][1] + else: + headers["to"] = " <".join(headers["to"]) + ">" + + to_ = dict() + to_["sample.headers.to"] = headers["to"] + to_query = Q(dict(match_phrase=to_)) + q = q & to_query if "subject" in headers: subject = headers["subject"] subject_query = {"match_phrase": {"sample.headers.subject": subject}} q = q & Q(subject_query) search.query = q - - print(search.__str__()) existing = search.execute() if len(existing) > 0: @@ -591,7 +604,7 @@ def save_forensic_report_to_elasticsearch( "A forensic sample to {0} from {1} " "with a subject of {2} and arrival date of {3} " "already exists in " - "Elasticsearch".format(to_, from_, subject, arrival_date_human) + "Elasticsearch".format(to_, from_, subject, forensic_report["arrival_date_utc"]) ) parsed_sample = forensic_report["parsed_sample"] @@ -627,7 +640,7 @@ def save_forensic_report_to_elasticsearch( user_agent=forensic_report["user_agent"], version=forensic_report["version"], original_mail_from=forensic_report["original_mail_from"], - arrival_date=arrival_date, + arrival_date=arrival_date_epoch_milliseconds, domain=forensic_report["reported_domain"], original_envelope_id=forensic_report["original_envelope_id"], authentication_results=forensic_report["authentication_results"], diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 43ffff8..7d51e5f 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -552,8 +552,8 @@ def save_forensic_report_to_opensearch( for original_header in original_headers: headers[original_header.lower()] = original_headers[original_header] - arrival_date_human = forensic_report["arrival_date_utc"] - arrival_date = human_timestamp_to_datetime(arrival_date_human) + arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"]) + arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000) if index_suffix is not None: search_index = "dmarc_forensic_{0}*".format(index_suffix) @@ -562,20 +562,35 @@ def save_forensic_report_to_opensearch( if index_prefix is not None: search_index = "{0}{1}".format(index_prefix, search_index) search = Search(index=search_index) - arrival_query = {"match": {"arrival_date": arrival_date}} - q = Q(arrival_query) + q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) from_ = None to_ = None subject = None if "from" in headers: - from_ = headers["from"] - from_query = {"match_phrase": {"sample.headers.from": from_}} - q = q & Q(from_query) + # We convert the FROM header from a string list to a flat string. + headers["from"] = headers["from"][0] + if headers["from"][0] == "": + headers["from"] = headers["from"][1] + else: + headers["from"] = " <".join(headers["from"]) + ">" + + from_ = dict() + from_["sample.headers.from"] = headers["from"] + from_query = Q(dict(match_phrase=from_)) + q = q & from_query if "to" in headers: - to_ = headers["to"] - to_query = {"match_phrase": {"sample.headers.to": to_}} - q = q & Q(to_query) + # We convert the TO header from a string list to a flat string. + headers["to"] = headers["to"][0] + if headers["to"][0] == "": + headers["to"] = headers["to"][1] + else: + headers["to"] = " <".join(headers["to"]) + ">" + + to_ = dict() + to_["sample.headers.to"] = headers["to"] + to_query = Q(dict(match_phrase=to_)) + q = q & to_query if "subject" in headers: subject = headers["subject"] subject_query = {"match_phrase": {"sample.headers.subject": subject}} @@ -589,7 +604,7 @@ def save_forensic_report_to_opensearch( "A forensic sample to {0} from {1} " "with a subject of {2} and arrival date of {3} " "already exists in " - "OpenSearch".format(to_, from_, subject, arrival_date_human) + "OpenSearch".format(to_, from_, subject, forensic_report["arrival_date_utc"]) ) parsed_sample = forensic_report["parsed_sample"] @@ -625,7 +640,7 @@ def save_forensic_report_to_opensearch( user_agent=forensic_report["user_agent"], version=forensic_report["version"], original_mail_from=forensic_report["original_mail_from"], - arrival_date=arrival_date, + arrival_date=arrival_date_epoch_milliseconds, domain=forensic_report["reported_domain"], original_envelope_id=forensic_report["original_envelope_id"], authentication_results=forensic_report["authentication_results"],