mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-02-17 07:03:58 +00:00
adding OpenSearch support, fixing minor typos, and code styling (#481)
* adding OpenSearch support, fixing minor typos and code styling * documentation update
This commit is contained in:
@@ -14,6 +14,14 @@
|
||||
:members:
|
||||
```
|
||||
|
||||
## parsedmarc.opensearch
|
||||
|
||||
```{eval-rst}
|
||||
.. automodule:: parsedmarc.opensearch
|
||||
:members:
|
||||
```
|
||||
|
||||
|
||||
## parsedmarc.splunk
|
||||
|
||||
```{eval-rst}
|
||||
|
||||
@@ -26,7 +26,7 @@ Thanks to all [contributors]!
|
||||
```
|
||||
|
||||
`parsedmarc` is a Python module and CLI utility for parsing DMARC reports.
|
||||
When used with Elasticsearch and Kibana (or Splunk), it works as a self-hosted
|
||||
When used with Elasticsearch and Kibana (or Splunk), or with OpenSearch and Grafana, it works as a self-hosted
|
||||
open source alternative to commercial DMARC report processing services such
|
||||
as Agari Brand Protection, Dmarcian, OnDMARC, ProofPoint Email Fraud Defense,
|
||||
and Valimail.
|
||||
@@ -40,7 +40,7 @@ and Valimail.
|
||||
- Consistent data structures
|
||||
- Simple JSON and/or CSV output
|
||||
- Optionally email the results
|
||||
- Optionally send the results to Elasticsearch and/or Splunk, for use with
|
||||
- Optionally send the results to Elasticsearch/OpenSearch and/or Splunk, for use with
|
||||
premade dashboards
|
||||
- Optionally send reports to Apache Kafka
|
||||
|
||||
@@ -52,6 +52,7 @@ installation
|
||||
usage
|
||||
output
|
||||
elasticsearch
|
||||
opensearch
|
||||
kibana
|
||||
splunk
|
||||
davmail
|
||||
|
||||
14
docs/source/opensearch.md
Normal file
14
docs/source/opensearch.md
Normal file
@@ -0,0 +1,14 @@
|
||||
# OpenSearch and Grafana
|
||||
|
||||
To set up visual dashboards of DMARC data, install OpenSearch and Grafana.
|
||||
|
||||
## Installation
|
||||
|
||||
OpenSearch: https://opensearch.org/docs/latest/install-and-configure/install-opensearch/index/
|
||||
Grafana: https://grafana.com/docs/grafana/latest/setup-grafana/installation/
|
||||
|
||||
## Records retention
|
||||
|
||||
Starting in version 5.0.0, `parsedmarc` stores data in a separate
|
||||
index for each day to make it easy to comply with records
|
||||
retention regulations such as GDPR.
|
||||
@@ -82,6 +82,10 @@ delete = False
|
||||
hosts = 127.0.0.1:9200
|
||||
ssl = False
|
||||
|
||||
[opensearch]
|
||||
hosts = https://admin:admin@127.0.0.1:9200
|
||||
ssl = True
|
||||
|
||||
[splunk_hec]
|
||||
url = https://splunkhec.example.com
|
||||
token = HECTokenGoesHere
|
||||
@@ -238,6 +242,28 @@ The full set of configuration options are:
|
||||
creating the index (Default: `1`)
|
||||
- `number_of_replicas` - int: The number of replicas to use when
|
||||
creating the index (Default: `0`)
|
||||
- `opensearch`
|
||||
- `hosts` - str: A comma separated list of hostnames and ports
|
||||
or URLs (e.g. `127.0.0.1:9200` or
|
||||
`https://user:secret@localhost`)
|
||||
|
||||
:::{note}
|
||||
Special characters in the username or password must be
|
||||
[URL encoded].
|
||||
:::
|
||||
- `user` - str: Basic auth username
|
||||
- `password` - str: Basic auth password
|
||||
- `apiKey` - str: API key
|
||||
- `ssl` - bool: Use an encrypted SSL/TLS connection
|
||||
(Default: `True`)
|
||||
- `timeout` - float: Timeout in seconds (Default: 60)
|
||||
- `cert_path` - str: Path to a trusted certificates
|
||||
- `index_suffix` - str: A suffix to apply to the index names
|
||||
- `monthly_indexes` - bool: Use monthly indexes instead of daily indexes
|
||||
- `number_of_shards` - int: The number of shards to use when
|
||||
creating the index (Default: `1`)
|
||||
- `number_of_replicas` - int: The number of replicas to use when
|
||||
creating the index (Default: `0`)
|
||||
- `splunk_hec`
|
||||
- `url` - str: The URL of the Splunk HTTP Events Collector (HEC)
|
||||
- `token` - str: The HEC token
|
||||
|
||||
@@ -18,7 +18,7 @@ import time
|
||||
from tqdm import tqdm
|
||||
|
||||
from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \
|
||||
parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \
|
||||
parse_report_file, get_dmarc_reports_from_mbox, elastic, opensearch, kafkaclient, \
|
||||
splunk, save_output, email_results, ParserError, __version__, \
|
||||
InvalidDMARCReport, s3, syslog, loganalytics
|
||||
|
||||
@@ -106,6 +106,27 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("Elasticsearch exception error: {}".format(
|
||||
error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.opensearch_hosts:
|
||||
shards = opts.opensearch_number_of_shards
|
||||
replicas = opts.opensearch_number_of_replicas
|
||||
opensearch.save_aggregate_report_to_opensearch(
|
||||
report,
|
||||
index_suffix=opts.opensearch_index_suffix,
|
||||
monthly_indexes=opts.opensearch_monthly_indexes,
|
||||
number_of_shards=shards,
|
||||
number_of_replicas=replicas
|
||||
)
|
||||
except opensearch.AlreadySaved as warning:
|
||||
logger.warning(warning.__str__())
|
||||
except opensearch.OpenSearchError as error_:
|
||||
logger.error("OpenSearch Error: {0}".format(
|
||||
error_.__str__()))
|
||||
except Exception as error_:
|
||||
logger.error("OpenSearch exception error: {}".format(
|
||||
error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.kafka_hosts:
|
||||
kafka_client.save_aggregate_reports_to_kafka(
|
||||
@@ -113,16 +134,19 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("Kafka Error: {0}".format(
|
||||
error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.s3_bucket:
|
||||
s3_client.save_aggregate_report_to_s3(report)
|
||||
except Exception as error_:
|
||||
logger.error("S3 Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.syslog_server:
|
||||
syslog_client.save_aggregate_report_to_syslog(report)
|
||||
except Exception as error_:
|
||||
logger.error("Syslog Error: {0}".format(error_.__str__()))
|
||||
|
||||
if opts.hec:
|
||||
try:
|
||||
aggregate_reports_ = reports_["aggregate_reports"]
|
||||
@@ -131,6 +155,7 @@ def _main():
|
||||
aggregate_reports_)
|
||||
except splunk.SplunkError as e:
|
||||
logger.error("Splunk HEC error: {0}".format(e.__str__()))
|
||||
|
||||
if opts.save_forensic:
|
||||
for report in reports_["forensic_reports"]:
|
||||
try:
|
||||
@@ -150,6 +175,25 @@ def _main():
|
||||
error_.__str__()))
|
||||
except InvalidDMARCReport as error_:
|
||||
logger.error(error_.__str__())
|
||||
|
||||
try:
|
||||
shards = opts.opensearch_number_of_shards
|
||||
replicas = opts.opensearch_number_of_replicas
|
||||
if opts.opensearch_hosts:
|
||||
opensearch.save_forensic_report_to_opensearch(
|
||||
report,
|
||||
index_suffix=opts.opensearch_index_suffix,
|
||||
monthly_indexes=opts.opensearch_monthly_indexes,
|
||||
number_of_shards=shards,
|
||||
number_of_replicas=replicas)
|
||||
except opensearch.AlreadySaved as warning:
|
||||
logger.warning(warning.__str__())
|
||||
except opensearch.OpenSearchError as error_:
|
||||
logger.error("OpenSearch Error: {0}".format(
|
||||
error_.__str__()))
|
||||
except InvalidDMARCReport as error_:
|
||||
logger.error(error_.__str__())
|
||||
|
||||
try:
|
||||
if opts.kafka_hosts:
|
||||
kafka_client.save_forensic_reports_to_kafka(
|
||||
@@ -157,16 +201,19 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("Kafka Error: {0}".format(
|
||||
error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.s3_bucket:
|
||||
s3_client.save_forensic_report_to_s3(report)
|
||||
except Exception as error_:
|
||||
logger.error("S3 Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.syslog_server:
|
||||
syslog_client.save_forensic_report_to_syslog(report)
|
||||
except Exception as error_:
|
||||
logger.error("Syslog Error: {0}".format(error_.__str__()))
|
||||
|
||||
if opts.hec:
|
||||
try:
|
||||
forensic_reports_ = reports_["forensic_reports"]
|
||||
@@ -175,6 +222,7 @@ def _main():
|
||||
forensic_reports_)
|
||||
except splunk.SplunkError as e:
|
||||
logger.error("Splunk HEC error: {0}".format(e.__str__()))
|
||||
|
||||
if opts.save_smtp_tls:
|
||||
for report in reports_["smtp_tls_reports"]:
|
||||
try:
|
||||
@@ -194,6 +242,25 @@ def _main():
|
||||
error_.__str__()))
|
||||
except InvalidDMARCReport as error_:
|
||||
logger.error(error_.__str__())
|
||||
|
||||
try:
|
||||
shards = opts.opensearch_number_of_shards
|
||||
replicas = opts.opensearch_number_of_replicas
|
||||
if opts.opensearch_hosts:
|
||||
opensearch.save_smtp_tls_report_to_opensearch(
|
||||
report,
|
||||
index_suffix=opts.opensearch_index_suffix,
|
||||
monthly_indexes=opts.opensearch_monthly_indexes,
|
||||
number_of_shards=shards,
|
||||
number_of_replicas=replicas)
|
||||
except opensearch.AlreadySaved as warning:
|
||||
logger.warning(warning.__str__())
|
||||
except opensearch.OpenSearchError as error_:
|
||||
logger.error("OpenSearch Error: {0}".format(
|
||||
error_.__str__()))
|
||||
except InvalidDMARCReport as error_:
|
||||
logger.error(error_.__str__())
|
||||
|
||||
try:
|
||||
if opts.kafka_hosts:
|
||||
kafka_client.save_smtp_tls_reports_to_kafka(
|
||||
@@ -201,16 +268,19 @@ def _main():
|
||||
except Exception as error_:
|
||||
logger.error("Kafka Error: {0}".format(
|
||||
error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.s3_bucket:
|
||||
s3_client.save_smtp_tls_report_to_s3(report)
|
||||
except Exception as error_:
|
||||
logger.error("S3 Error: {0}".format(error_.__str__()))
|
||||
|
||||
try:
|
||||
if opts.syslog_server:
|
||||
syslog_client.save_smtp_tls_report_to_syslog(report)
|
||||
except Exception as error_:
|
||||
logger.error("Syslog Error: {0}".format(error_.__str__()))
|
||||
|
||||
if opts.hec:
|
||||
try:
|
||||
smtp_tls_reports_ = reports_["smtp_tls_reports"]
|
||||
@@ -219,6 +289,7 @@ def _main():
|
||||
smtp_tls_reports_)
|
||||
except splunk.SplunkError as e:
|
||||
logger.error("Splunk HEC error: {0}".format(e.__str__()))
|
||||
|
||||
if opts.la_dce:
|
||||
try:
|
||||
la_client = loganalytics.LogAnalyticsClient(
|
||||
@@ -366,6 +437,17 @@ def _main():
|
||||
elasticsearch_username=None,
|
||||
elasticsearch_password=None,
|
||||
elasticsearch_apiKey=None,
|
||||
opensearch_hosts=None,
|
||||
opensearch_timeout=60,
|
||||
opensearch_number_of_shards=1,
|
||||
opensearch_number_of_replicas=0,
|
||||
opensearch_index_suffix=None,
|
||||
opensearch_ssl=True,
|
||||
opensearch_ssl_cert_path=None,
|
||||
opensearch_monthly_indexes=False,
|
||||
opensearch_username=None,
|
||||
opensearch_password=None,
|
||||
opensearch_apiKey=None,
|
||||
kafka_hosts=None,
|
||||
kafka_username=None,
|
||||
kafka_password=None,
|
||||
@@ -674,6 +756,49 @@ def _main():
|
||||
if "apiKey" in elasticsearch_config:
|
||||
opts.elasticsearch_apiKey = elasticsearch_config[
|
||||
"apiKey"]
|
||||
|
||||
if "opensearch" in config:
|
||||
opensearch_config = config["opensearch"]
|
||||
if "hosts" in opensearch_config:
|
||||
opts.opensearch_hosts = _str_to_list(opensearch_config[
|
||||
"hosts"])
|
||||
else:
|
||||
logger.critical("hosts setting missing from the "
|
||||
"opensearch config section")
|
||||
exit(-1)
|
||||
if "timeout" in opensearch_config:
|
||||
timeout = opensearch_config.getfloat("timeout")
|
||||
opts.opensearch_timeout = timeout
|
||||
if "number_of_shards" in opensearch_config:
|
||||
number_of_shards = opensearch_config.getint(
|
||||
"number_of_shards")
|
||||
opts.opensearch_number_of_shards = number_of_shards
|
||||
if "number_of_replicas" in opensearch_config:
|
||||
number_of_replicas = opensearch_config.getint(
|
||||
"number_of_replicas")
|
||||
opts.opensearch_number_of_replicas = number_of_replicas
|
||||
if "index_suffix" in opensearch_config:
|
||||
opts.opensearch_index_suffix = opensearch_config[
|
||||
"index_suffix"]
|
||||
if "monthly_indexes" in opensearch_config:
|
||||
monthly = opensearch_config.getboolean("monthly_indexes")
|
||||
opts.opensearch_monthly_indexes = monthly
|
||||
if "ssl" in opensearch_config:
|
||||
opts.opensearch_ssl = opensearch_config.getboolean(
|
||||
"ssl")
|
||||
if "cert_path" in opensearch_config:
|
||||
opts.opensearch_ssl_cert_path = opensearch_config[
|
||||
"cert_path"]
|
||||
if "user" in opensearch_config:
|
||||
opts.opensearch_username = opensearch_config[
|
||||
"user"]
|
||||
if "password" in opensearch_config:
|
||||
opts.opensearch_password = opensearch_config[
|
||||
"password"]
|
||||
if "apiKey" in opensearch_config:
|
||||
opts.opensearch_apiKey = opensearch_config[
|
||||
"apiKey"]
|
||||
|
||||
if "splunk_hec" in config.sections():
|
||||
hec_config = config["splunk_hec"]
|
||||
if "url" in hec_config:
|
||||
@@ -697,6 +822,7 @@ def _main():
|
||||
if "skip_certificate_verification" in hec_config:
|
||||
opts.hec_skip_certificate_verification = hec_config[
|
||||
"skip_certificate_verification"]
|
||||
|
||||
if "kafka" in config.sections():
|
||||
kafka_config = config["kafka"]
|
||||
if "hosts" in kafka_config:
|
||||
@@ -739,6 +865,7 @@ def _main():
|
||||
else:
|
||||
logger.critical("forensic_topic setting missing from the "
|
||||
"splunk_hec config section")
|
||||
|
||||
if "smtp" in config.sections():
|
||||
smtp_config = config["smtp"]
|
||||
if "host" in smtp_config:
|
||||
@@ -783,6 +910,7 @@ def _main():
|
||||
opts.smtp_attachment = smtp_config["attachment"]
|
||||
if "message" in smtp_config:
|
||||
opts.smtp_message = smtp_config["message"]
|
||||
|
||||
if "s3" in config.sections():
|
||||
s3_config = config["s3"]
|
||||
if "bucket" in s3_config:
|
||||
@@ -840,6 +968,7 @@ def _main():
|
||||
if "oauth2_port" in gmail_api_config:
|
||||
opts.gmail_api_oauth2_port = \
|
||||
gmail_api_config.get("oauth2_port", 8080)
|
||||
|
||||
if "log_analytics" in config.sections():
|
||||
log_analytics_config = config["log_analytics"]
|
||||
opts.la_client_id = \
|
||||
@@ -917,6 +1046,33 @@ def _main():
|
||||
logger.exception("Elasticsearch Error")
|
||||
exit(1)
|
||||
|
||||
try:
|
||||
if opts.opensearch_hosts:
|
||||
os_aggregate_index = "dmarc_aggregate"
|
||||
os_forensic_index = "dmarc_forensic"
|
||||
os_smtp_tls_index = "smtp_tls"
|
||||
if opts.opensearch_index_suffix:
|
||||
suffix = opts.opensearch_index_suffix
|
||||
os_aggregate_index = "{0}_{1}".format(
|
||||
os_aggregate_index, suffix)
|
||||
os_forensic_index = "{0}_{1}".format(
|
||||
os_forensic_index, suffix)
|
||||
os_smtp_tls_index = "{0}_{1}".format(
|
||||
os_smtp_tls_index, suffix
|
||||
)
|
||||
opensearch.set_hosts(opts.opensearch_hosts,
|
||||
opts.opensearch_ssl,
|
||||
opts.opensearch_ssl_cert_path,
|
||||
opts.opensearch_username,
|
||||
opts.opensearch_password,
|
||||
opts.opensearch_apiKey,
|
||||
timeout=opts.opensearch_timeout)
|
||||
opensearch.migrate_indexes(aggregate_indexes=[os_aggregate_index],
|
||||
forensic_indexes=[os_forensic_index])
|
||||
except opensearch.OpenSearchError:
|
||||
logger.exception("OpenSearch Error")
|
||||
exit(1)
|
||||
|
||||
if opts.s3_bucket:
|
||||
try:
|
||||
s3_client = s3.S3Client(
|
||||
|
||||
@@ -349,7 +349,7 @@ def save_aggregate_report_to_elasticsearch(aggregate_report,
|
||||
number_of_shards=1,
|
||||
number_of_replicas=0):
|
||||
"""
|
||||
Saves a parsed DMARC aggregate report to ElasticSearch
|
||||
Saves a parsed DMARC aggregate report to Elasticsearch
|
||||
|
||||
Args:
|
||||
aggregate_report (OrderedDict): A parsed forensic report
|
||||
@@ -484,7 +484,7 @@ def save_forensic_report_to_elasticsearch(forensic_report,
|
||||
number_of_shards=1,
|
||||
number_of_replicas=0):
|
||||
"""
|
||||
Saves a parsed DMARC forensic report to ElasticSearch
|
||||
Saves a parsed DMARC forensic report to Elasticsearch
|
||||
|
||||
Args:
|
||||
forensic_report (OrderedDict): A parsed forensic report
|
||||
@@ -627,7 +627,7 @@ def save_smtp_tls_report_to_elasticsearch(report,
|
||||
number_of_shards=1,
|
||||
number_of_replicas=0):
|
||||
"""
|
||||
Saves a parsed SMTP TLS report to elasticSearch
|
||||
Saves a parsed SMTP TLS report to Elasticsearch
|
||||
|
||||
Args:
|
||||
report (OrderedDict): A parsed SMTP TLS report
|
||||
|
||||
744
parsedmarc/opensearch.py
Normal file
744
parsedmarc/opensearch.py
Normal file
@@ -0,0 +1,744 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
from opensearchpy import Q, connections, Object, Document, Index, Nested, \
|
||||
InnerDoc, Integer, Text, Boolean, Ip, Date, Search
|
||||
from opensearchpy.helpers import reindex
|
||||
|
||||
from parsedmarc.log import logger
|
||||
from parsedmarc.utils import human_timestamp_to_datetime
|
||||
from parsedmarc import InvalidForensicReport
|
||||
|
||||
|
||||
class OpenSearchError(Exception):
|
||||
"""Raised when an OpenSearch error occurs"""
|
||||
|
||||
|
||||
class _PolicyOverride(InnerDoc):
|
||||
type = Text()
|
||||
comment = Text()
|
||||
|
||||
|
||||
class _PublishedPolicy(InnerDoc):
|
||||
domain = Text()
|
||||
adkim = Text()
|
||||
aspf = Text()
|
||||
p = Text()
|
||||
sp = Text()
|
||||
pct = Integer()
|
||||
fo = Text()
|
||||
|
||||
|
||||
class _DKIMResult(InnerDoc):
|
||||
domain = Text()
|
||||
selector = Text()
|
||||
result = Text()
|
||||
|
||||
|
||||
class _SPFResult(InnerDoc):
|
||||
domain = Text()
|
||||
scope = Text()
|
||||
results = Text()
|
||||
|
||||
|
||||
class _AggregateReportDoc(Document):
|
||||
class Index:
|
||||
name = "dmarc_aggregate"
|
||||
|
||||
xml_schema = Text()
|
||||
org_name = Text()
|
||||
org_email = Text()
|
||||
org_extra_contact_info = Text()
|
||||
report_id = Text()
|
||||
date_range = Date()
|
||||
date_begin = Date()
|
||||
date_end = Date()
|
||||
errors = Text()
|
||||
published_policy = Object(_PublishedPolicy)
|
||||
source_ip_address = Ip()
|
||||
source_country = Text()
|
||||
source_reverse_dns = Text()
|
||||
source_Base_domain = Text()
|
||||
message_count = Integer
|
||||
disposition = Text()
|
||||
dkim_aligned = Boolean()
|
||||
spf_aligned = Boolean()
|
||||
passed_dmarc = Boolean()
|
||||
policy_overrides = Nested(_PolicyOverride)
|
||||
header_from = Text()
|
||||
envelope_from = Text()
|
||||
envelope_to = Text()
|
||||
dkim_results = Nested(_DKIMResult)
|
||||
spf_results = Nested(_SPFResult)
|
||||
|
||||
def add_policy_override(self, type_, comment):
|
||||
self.policy_overrides.append(_PolicyOverride(type=type_,
|
||||
comment=comment))
|
||||
|
||||
def add_dkim_result(self, domain, selector, result):
|
||||
self.dkim_results.append(_DKIMResult(domain=domain,
|
||||
selector=selector,
|
||||
result=result))
|
||||
|
||||
def add_spf_result(self, domain, scope, result):
|
||||
self.spf_results.append(_SPFResult(domain=domain,
|
||||
scope=scope,
|
||||
result=result))
|
||||
|
||||
def save(self, ** kwargs):
|
||||
self.passed_dmarc = False
|
||||
self.passed_dmarc = self.spf_aligned or self.dkim_aligned
|
||||
|
||||
return super().save(** kwargs)
|
||||
|
||||
|
||||
class _EmailAddressDoc(InnerDoc):
|
||||
display_name = Text()
|
||||
address = Text()
|
||||
|
||||
|
||||
class _EmailAttachmentDoc(Document):
|
||||
filename = Text()
|
||||
content_type = Text()
|
||||
sha256 = Text()
|
||||
|
||||
|
||||
class _ForensicSampleDoc(InnerDoc):
|
||||
raw = Text()
|
||||
headers = Object()
|
||||
headers_only = Boolean()
|
||||
to = Nested(_EmailAddressDoc)
|
||||
subject = Text()
|
||||
filename_safe_subject = Text()
|
||||
_from = Object(_EmailAddressDoc)
|
||||
date = Date()
|
||||
reply_to = Nested(_EmailAddressDoc)
|
||||
cc = Nested(_EmailAddressDoc)
|
||||
bcc = Nested(_EmailAddressDoc)
|
||||
body = Text()
|
||||
attachments = Nested(_EmailAttachmentDoc)
|
||||
|
||||
def add_to(self, display_name, address):
|
||||
self.to.append(_EmailAddressDoc(display_name=display_name,
|
||||
address=address))
|
||||
|
||||
def add_reply_to(self, display_name, address):
|
||||
self.reply_to.append(_EmailAddressDoc(display_name=display_name,
|
||||
address=address))
|
||||
|
||||
def add_cc(self, display_name, address):
|
||||
self.cc.append(_EmailAddressDoc(display_name=display_name,
|
||||
address=address))
|
||||
|
||||
def add_bcc(self, display_name, address):
|
||||
self.bcc.append(_EmailAddressDoc(display_name=display_name,
|
||||
address=address))
|
||||
|
||||
def add_attachment(self, filename, content_type, sha256):
|
||||
self.attachments.append(_EmailAttachmentDoc(filename=filename,
|
||||
content_type=content_type, sha256=sha256))
|
||||
|
||||
|
||||
class _ForensicReportDoc(Document):
|
||||
class Index:
|
||||
name = "dmarc_forensic"
|
||||
|
||||
feedback_type = Text()
|
||||
user_agent = Text()
|
||||
version = Text()
|
||||
original_mail_from = Text()
|
||||
arrival_date = Date()
|
||||
domain = Text()
|
||||
original_envelope_id = Text()
|
||||
authentication_results = Text()
|
||||
delivery_results = Text()
|
||||
source_ip_address = Ip()
|
||||
source_country = Text()
|
||||
source_reverse_dns = Text()
|
||||
source_authentication_mechanisms = Text()
|
||||
source_auth_failures = Text()
|
||||
dkim_domain = Text()
|
||||
original_rcpt_to = Text()
|
||||
sample = Object(_ForensicSampleDoc)
|
||||
|
||||
|
||||
class _SMTPTLSFailureDetailsDoc(InnerDoc):
|
||||
result_type = Text()
|
||||
sending_mta_ip = Ip()
|
||||
receiving_mx_helo = Text()
|
||||
receiving_ip = Ip()
|
||||
failed_session_count = Integer()
|
||||
additional_information_uri = Text()
|
||||
failure_reason_code = Text()
|
||||
|
||||
|
||||
class _SMTPTLSPolicyDoc(InnerDoc):
|
||||
policy_domain = Text()
|
||||
policy_type = Text()
|
||||
policy_strings = Text()
|
||||
mx_host_patterns = Text()
|
||||
successful_session_count = Integer()
|
||||
failed_session_count = Integer()
|
||||
failure_details = Nested(_SMTPTLSFailureDetailsDoc)
|
||||
|
||||
def add_failure_details(self, result_type, ip_address,
|
||||
receiving_ip,
|
||||
receiving_mx_helo,
|
||||
failed_session_count,
|
||||
receiving_mx_hostname=None,
|
||||
additional_information_uri=None,
|
||||
failure_reason_code=None):
|
||||
self.failure_details.append(
|
||||
result_type=result_type,
|
||||
ip_address=ip_address,
|
||||
receiving_mx_hostname=receiving_mx_hostname,
|
||||
receiving_mx_helo=receiving_mx_helo,
|
||||
receiving_ip=receiving_ip,
|
||||
failed_session_count=failed_session_count,
|
||||
additional_information=additional_information_uri,
|
||||
failure_reason_code=failure_reason_code
|
||||
)
|
||||
|
||||
|
||||
class _SMTPTLSFailureReportDoc(Document):
|
||||
|
||||
class Index:
|
||||
name = "smtp_tls"
|
||||
|
||||
organization_name = Text()
|
||||
date_range = Date()
|
||||
date_begin = Date()
|
||||
date_end = Date()
|
||||
contact_info = Text()
|
||||
report_id = Text()
|
||||
policies = Nested(_SMTPTLSPolicyDoc)
|
||||
|
||||
def add_policy(self, policy_type, policy_domain,
|
||||
successful_session_count,
|
||||
failed_session_count,
|
||||
policy_string=None,
|
||||
mx_host_patterns=None,
|
||||
failure_details=None):
|
||||
self.policies.append(policy_type=policy_type,
|
||||
policy_domain=policy_domain,
|
||||
successful_session_count=successful_session_count,
|
||||
failed_session_count=failed_session_count,
|
||||
policy_string=policy_string,
|
||||
mx_host_patterns=mx_host_patterns,
|
||||
failure_details=failure_details)
|
||||
|
||||
|
||||
class AlreadySaved(ValueError):
|
||||
"""Raised when a report to be saved matches an existing report"""
|
||||
|
||||
|
||||
def set_hosts(hosts, use_ssl=False, ssl_cert_path=None,
|
||||
username=None, password=None, apiKey=None, timeout=60.0):
|
||||
"""
|
||||
Sets the OpenSearch hosts to use
|
||||
|
||||
Args:
|
||||
hosts (str|list): A single hostname or URL, or list of hostnames or URLs
|
||||
use_ssl (bool): Use an HTTPS connection to the server
|
||||
ssl_cert_path (str): Path to the certificate chain
|
||||
username (str): The username to use for authentication
|
||||
password (str): The password to use for authentication
|
||||
apiKey (str): The Base64 encoded API key to use for authentication
|
||||
timeout (float): Timeout in seconds
|
||||
"""
|
||||
if not isinstance(hosts, list):
|
||||
hosts = [hosts]
|
||||
conn_params = {
|
||||
"hosts": hosts,
|
||||
"timeout": timeout
|
||||
}
|
||||
if use_ssl:
|
||||
conn_params['use_ssl'] = True
|
||||
if ssl_cert_path:
|
||||
conn_params['verify_certs'] = True
|
||||
conn_params['ca_certs'] = ssl_cert_path
|
||||
else:
|
||||
conn_params['verify_certs'] = False
|
||||
if username:
|
||||
conn_params['http_auth'] = (username+":"+password)
|
||||
if apiKey:
|
||||
conn_params['api_key'] = apiKey
|
||||
connections.create_connection(**conn_params)
|
||||
|
||||
|
||||
def create_indexes(names, settings=None):
|
||||
"""
|
||||
Create OpenSearch indexes
|
||||
|
||||
Args:
|
||||
names (list): A list of index names
|
||||
settings (dict): Index settings
|
||||
|
||||
"""
|
||||
for name in names:
|
||||
index = Index(name)
|
||||
try:
|
||||
if not index.exists():
|
||||
logger.debug("Creating OpenSearch index: {0}".format(name))
|
||||
if settings is None:
|
||||
index.settings(number_of_shards=1,
|
||||
number_of_replicas=0)
|
||||
else:
|
||||
index.settings(**settings)
|
||||
index.create()
|
||||
except Exception as e:
|
||||
raise OpenSearchError(
|
||||
"OpenSearch error: {0}".format(e.__str__()))
|
||||
|
||||
|
||||
def migrate_indexes(aggregate_indexes=None, forensic_indexes=None):
|
||||
"""
|
||||
Updates index mappings
|
||||
|
||||
Args:
|
||||
aggregate_indexes (list): A list of aggregate index names
|
||||
forensic_indexes (list): A list of forensic index names
|
||||
"""
|
||||
version = 2
|
||||
if aggregate_indexes is None:
|
||||
aggregate_indexes = []
|
||||
if forensic_indexes is None:
|
||||
forensic_indexes = []
|
||||
for aggregate_index_name in aggregate_indexes:
|
||||
if not Index(aggregate_index_name).exists():
|
||||
continue
|
||||
aggregate_index = Index(aggregate_index_name)
|
||||
doc = "doc"
|
||||
fo_field = "published_policy.fo"
|
||||
fo = "fo"
|
||||
fo_mapping = aggregate_index.get_field_mapping(fields=[fo_field])
|
||||
fo_mapping = fo_mapping[list(fo_mapping.keys())[0]]["mappings"]
|
||||
if doc not in fo_mapping:
|
||||
continue
|
||||
|
||||
fo_mapping = fo_mapping[doc][fo_field]["mapping"][fo]
|
||||
fo_type = fo_mapping["type"]
|
||||
if fo_type == "long":
|
||||
new_index_name = "{0}-v{1}".format(aggregate_index_name, version)
|
||||
body = {"properties": {"published_policy.fo": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Index(new_index_name).create()
|
||||
Index(new_index_name).put_mapping(doc_type=doc, body=body)
|
||||
reindex(connections.get_connection(), aggregate_index_name,
|
||||
new_index_name)
|
||||
Index(aggregate_index_name).delete()
|
||||
|
||||
for forensic_index in forensic_indexes:
|
||||
pass
|
||||
|
||||
|
||||
def save_aggregate_report_to_opensearch(aggregate_report,
|
||||
index_suffix=None,
|
||||
monthly_indexes=False,
|
||||
number_of_shards=1,
|
||||
number_of_replicas=0):
|
||||
"""
|
||||
Saves a parsed DMARC aggregate report to OpenSearch
|
||||
|
||||
Args:
|
||||
aggregate_report (OrderedDict): A parsed forensic report
|
||||
index_suffix (str): The suffix of the name of the index to save to
|
||||
monthly_indexes (bool): Use monthly indexes instead of daily indexes
|
||||
number_of_shards (int): The number of shards to use in the index
|
||||
number_of_replicas (int): The number of replicas to use in the index
|
||||
|
||||
Raises:
|
||||
AlreadySaved
|
||||
"""
|
||||
logger.info("Saving aggregate report to OpenSearch")
|
||||
aggregate_report = aggregate_report.copy()
|
||||
metadata = aggregate_report["report_metadata"]
|
||||
org_name = metadata["org_name"]
|
||||
report_id = metadata["report_id"]
|
||||
domain = aggregate_report["policy_published"]["domain"]
|
||||
begin_date = human_timestamp_to_datetime(metadata["begin_date"],
|
||||
to_utc=True)
|
||||
end_date = human_timestamp_to_datetime(metadata["end_date"],
|
||||
to_utc=True)
|
||||
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
|
||||
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
|
||||
if monthly_indexes:
|
||||
index_date = begin_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = begin_date.strftime("%Y-%m-%d")
|
||||
aggregate_report["begin_date"] = begin_date
|
||||
aggregate_report["end_date"] = end_date
|
||||
date_range = [aggregate_report["begin_date"],
|
||||
aggregate_report["end_date"]]
|
||||
|
||||
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
|
||||
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
|
||||
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
|
||||
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
|
||||
end_date_query = Q(dict(match=dict(date_end=end_date)))
|
||||
|
||||
if index_suffix is not None:
|
||||
search = Search(index="dmarc_aggregate_{0}*".format(index_suffix))
|
||||
else:
|
||||
search = Search(index="dmarc_aggregate*")
|
||||
query = org_name_query & report_id_query & domain_query
|
||||
query = query & begin_date_query & end_date_query
|
||||
search.query = query
|
||||
|
||||
try:
|
||||
existing = search.execute()
|
||||
except Exception as error_:
|
||||
raise OpenSearchError("OpenSearch's search for existing report \
|
||||
error: {}".format(error_.__str__()))
|
||||
|
||||
if len(existing) > 0:
|
||||
raise AlreadySaved("An aggregate report ID {0} from {1} about {2} "
|
||||
"with a date range of {3} UTC to {4} UTC already "
|
||||
"exists in "
|
||||
"OpenSearch".format(report_id,
|
||||
org_name,
|
||||
domain,
|
||||
begin_date_human,
|
||||
end_date_human))
|
||||
published_policy = _PublishedPolicy(
|
||||
domain=aggregate_report["policy_published"]["domain"],
|
||||
adkim=aggregate_report["policy_published"]["adkim"],
|
||||
aspf=aggregate_report["policy_published"]["aspf"],
|
||||
p=aggregate_report["policy_published"]["p"],
|
||||
sp=aggregate_report["policy_published"]["sp"],
|
||||
pct=aggregate_report["policy_published"]["pct"],
|
||||
fo=aggregate_report["policy_published"]["fo"]
|
||||
)
|
||||
|
||||
for record in aggregate_report["records"]:
|
||||
agg_doc = _AggregateReportDoc(
|
||||
xml_schema=aggregate_report["xml_schema"],
|
||||
org_name=metadata["org_name"],
|
||||
org_email=metadata["org_email"],
|
||||
org_extra_contact_info=metadata["org_extra_contact_info"],
|
||||
report_id=metadata["report_id"],
|
||||
date_range=date_range,
|
||||
date_begin=aggregate_report["begin_date"],
|
||||
date_end=aggregate_report["end_date"],
|
||||
errors=metadata["errors"],
|
||||
published_policy=published_policy,
|
||||
source_ip_address=record["source"]["ip_address"],
|
||||
source_country=record["source"]["country"],
|
||||
source_reverse_dns=record["source"]["reverse_dns"],
|
||||
source_base_domain=record["source"]["base_domain"],
|
||||
message_count=record["count"],
|
||||
disposition=record["policy_evaluated"]["disposition"],
|
||||
dkim_aligned=record["policy_evaluated"]["dkim"] is not None and
|
||||
record["policy_evaluated"]["dkim"].lower() == "pass",
|
||||
spf_aligned=record["policy_evaluated"]["spf"] is not None and
|
||||
record["policy_evaluated"]["spf"].lower() == "pass",
|
||||
header_from=record["identifiers"]["header_from"],
|
||||
envelope_from=record["identifiers"]["envelope_from"],
|
||||
envelope_to=record["identifiers"]["envelope_to"]
|
||||
)
|
||||
|
||||
for override in record["policy_evaluated"]["policy_override_reasons"]:
|
||||
agg_doc.add_policy_override(type_=override["type"],
|
||||
comment=override["comment"])
|
||||
|
||||
for dkim_result in record["auth_results"]["dkim"]:
|
||||
agg_doc.add_dkim_result(domain=dkim_result["domain"],
|
||||
selector=dkim_result["selector"],
|
||||
result=dkim_result["result"])
|
||||
|
||||
for spf_result in record["auth_results"]["spf"]:
|
||||
agg_doc.add_spf_result(domain=spf_result["domain"],
|
||||
scope=spf_result["scope"],
|
||||
result=spf_result["result"])
|
||||
|
||||
index = "dmarc_aggregate"
|
||||
if index_suffix:
|
||||
index = "{0}_{1}".format(index, index_suffix)
|
||||
index = "{0}-{1}".format(index, index_date)
|
||||
index_settings = dict(number_of_shards=number_of_shards,
|
||||
number_of_replicas=number_of_replicas)
|
||||
create_indexes([index], index_settings)
|
||||
agg_doc.meta.index = index
|
||||
|
||||
try:
|
||||
agg_doc.save()
|
||||
except Exception as e:
|
||||
raise OpenSearchError(
|
||||
"OpenSearch error: {0}".format(e.__str__()))
|
||||
|
||||
|
||||
def save_forensic_report_to_opensearch(forensic_report,
|
||||
index_suffix=None,
|
||||
monthly_indexes=False,
|
||||
number_of_shards=1,
|
||||
number_of_replicas=0):
|
||||
"""
|
||||
Saves a parsed DMARC forensic report to OpenSearch
|
||||
|
||||
Args:
|
||||
forensic_report (OrderedDict): A parsed forensic report
|
||||
index_suffix (str): The suffix of the name of the index to save to
|
||||
monthly_indexes (bool): Use monthly indexes instead of daily
|
||||
indexes
|
||||
number_of_shards (int): The number of shards to use in the index
|
||||
number_of_replicas (int): The number of replicas to use in the
|
||||
index
|
||||
|
||||
Raises:
|
||||
AlreadySaved
|
||||
|
||||
"""
|
||||
logger.info("Saving forensic report to OpenSearch")
|
||||
forensic_report = forensic_report.copy()
|
||||
sample_date = None
|
||||
if forensic_report["parsed_sample"]["date"] is not None:
|
||||
sample_date = forensic_report["parsed_sample"]["date"]
|
||||
sample_date = human_timestamp_to_datetime(sample_date)
|
||||
original_headers = forensic_report["parsed_sample"]["headers"]
|
||||
headers = OrderedDict()
|
||||
for original_header in original_headers:
|
||||
headers[original_header.lower()] = original_headers[original_header]
|
||||
|
||||
arrival_date_human = forensic_report["arrival_date_utc"]
|
||||
arrival_date = human_timestamp_to_datetime(arrival_date_human)
|
||||
|
||||
if index_suffix is not None:
|
||||
search = Search(index="dmarc_forensic_{0}*".format(index_suffix))
|
||||
else:
|
||||
search = Search(index="dmarc_forensic*")
|
||||
arrival_query = {"match": {"arrival_date": arrival_date}}
|
||||
q = Q(arrival_query)
|
||||
|
||||
from_ = None
|
||||
to_ = None
|
||||
subject = None
|
||||
if "from" in headers:
|
||||
from_ = headers["from"]
|
||||
from_query = {"match_phrase": {"sample.headers.from": from_}}
|
||||
q = q & Q(from_query)
|
||||
if "to" in headers:
|
||||
to_ = headers["to"]
|
||||
to_query = {"match_phrase": {"sample.headers.to": to_}}
|
||||
q = q & Q(to_query)
|
||||
if "subject" in headers:
|
||||
subject = headers["subject"]
|
||||
subject_query = {"match_phrase": {"sample.headers.subject": subject}}
|
||||
q = q & Q(subject_query)
|
||||
|
||||
search.query = q
|
||||
existing = search.execute()
|
||||
|
||||
if len(existing) > 0:
|
||||
raise AlreadySaved("A forensic sample to {0} from {1} "
|
||||
"with a subject of {2} and arrival date of {3} "
|
||||
"already exists in "
|
||||
"OpenSearch".format(to_,
|
||||
from_,
|
||||
subject,
|
||||
arrival_date_human
|
||||
))
|
||||
|
||||
parsed_sample = forensic_report["parsed_sample"]
|
||||
sample = _ForensicSampleDoc(
|
||||
raw=forensic_report["sample"],
|
||||
headers=headers,
|
||||
headers_only=forensic_report["sample_headers_only"],
|
||||
date=sample_date,
|
||||
subject=forensic_report["parsed_sample"]["subject"],
|
||||
filename_safe_subject=parsed_sample["filename_safe_subject"],
|
||||
body=forensic_report["parsed_sample"]["body"]
|
||||
)
|
||||
|
||||
for address in forensic_report["parsed_sample"]["to"]:
|
||||
sample.add_to(display_name=address["display_name"],
|
||||
address=address["address"])
|
||||
for address in forensic_report["parsed_sample"]["reply_to"]:
|
||||
sample.add_reply_to(display_name=address["display_name"],
|
||||
address=address["address"])
|
||||
for address in forensic_report["parsed_sample"]["cc"]:
|
||||
sample.add_cc(display_name=address["display_name"],
|
||||
address=address["address"])
|
||||
for address in forensic_report["parsed_sample"]["bcc"]:
|
||||
sample.add_bcc(display_name=address["display_name"],
|
||||
address=address["address"])
|
||||
for attachment in forensic_report["parsed_sample"]["attachments"]:
|
||||
sample.add_attachment(filename=attachment["filename"],
|
||||
content_type=attachment["mail_content_type"],
|
||||
sha256=attachment["sha256"])
|
||||
try:
|
||||
forensic_doc = _ForensicReportDoc(
|
||||
feedback_type=forensic_report["feedback_type"],
|
||||
user_agent=forensic_report["user_agent"],
|
||||
version=forensic_report["version"],
|
||||
original_mail_from=forensic_report["original_mail_from"],
|
||||
arrival_date=arrival_date,
|
||||
domain=forensic_report["reported_domain"],
|
||||
original_envelope_id=forensic_report["original_envelope_id"],
|
||||
authentication_results=forensic_report["authentication_results"],
|
||||
delivery_results=forensic_report["delivery_result"],
|
||||
source_ip_address=forensic_report["source"]["ip_address"],
|
||||
source_country=forensic_report["source"]["country"],
|
||||
source_reverse_dns=forensic_report["source"]["reverse_dns"],
|
||||
source_base_domain=forensic_report["source"]["base_domain"],
|
||||
authentication_mechanisms=forensic_report[
|
||||
"authentication_mechanisms"],
|
||||
auth_failure=forensic_report["auth_failure"],
|
||||
dkim_domain=forensic_report["dkim_domain"],
|
||||
original_rcpt_to=forensic_report["original_rcpt_to"],
|
||||
sample=sample
|
||||
)
|
||||
|
||||
index = "dmarc_forensic"
|
||||
if index_suffix:
|
||||
index = "{0}_{1}".format(index, index_suffix)
|
||||
if monthly_indexes:
|
||||
index_date = arrival_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = arrival_date.strftime("%Y-%m-%d")
|
||||
index = "{0}-{1}".format(index, index_date)
|
||||
index_settings = dict(number_of_shards=number_of_shards,
|
||||
number_of_replicas=number_of_replicas)
|
||||
create_indexes([index], index_settings)
|
||||
forensic_doc.meta.index = index
|
||||
try:
|
||||
forensic_doc.save()
|
||||
except Exception as e:
|
||||
raise OpenSearchError(
|
||||
"OpenSearch error: {0}".format(e.__str__()))
|
||||
except KeyError as e:
|
||||
raise InvalidForensicReport(
|
||||
"Forensic report missing required field: {0}".format(e.__str__()))
|
||||
|
||||
|
||||
def save_smtp_tls_report_to_opensearch(report,
|
||||
index_suffix=None,
|
||||
monthly_indexes=False,
|
||||
number_of_shards=1,
|
||||
number_of_replicas=0):
|
||||
"""
|
||||
Saves a parsed SMTP TLS report to OpenSearch
|
||||
|
||||
Args:
|
||||
report (OrderedDict): A parsed SMTP TLS report
|
||||
index_suffix (str): The suffix of the name of the index to save to
|
||||
monthly_indexes (bool): Use monthly indexes instead of daily indexes
|
||||
number_of_shards (int): The number of shards to use in the index
|
||||
number_of_replicas (int): The number of replicas to use in the index
|
||||
|
||||
Raises:
|
||||
AlreadySaved
|
||||
"""
|
||||
logger.info("Saving aggregate report to OpenSearch")
|
||||
org_name = report["org_name"]
|
||||
report_id = report["report_id"]
|
||||
begin_date = human_timestamp_to_datetime(report["begin_date"],
|
||||
to_utc=True)
|
||||
end_date = human_timestamp_to_datetime(report["end_date"],
|
||||
to_utc=True)
|
||||
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
|
||||
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
|
||||
if monthly_indexes:
|
||||
index_date = begin_date.strftime("%Y-%m")
|
||||
else:
|
||||
index_date = begin_date.strftime("%Y-%m-%d")
|
||||
report["begin_date"] = begin_date
|
||||
report["end_date"] = end_date
|
||||
|
||||
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
|
||||
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
|
||||
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
|
||||
end_date_query = Q(dict(match=dict(date_end=end_date)))
|
||||
|
||||
if index_suffix is not None:
|
||||
search = Search(index="smtp_tls_{0}*".format(index_suffix))
|
||||
else:
|
||||
search = Search(index="smtp_tls")
|
||||
query = org_name_query & report_id_query
|
||||
query = query & begin_date_query & end_date_query
|
||||
search.query = query
|
||||
|
||||
try:
|
||||
existing = search.execute()
|
||||
except Exception as error_:
|
||||
raise OpenSearchError("OpenSearch's search for existing report \
|
||||
error: {}".format(error_.__str__()))
|
||||
|
||||
if len(existing) > 0:
|
||||
raise AlreadySaved(f"An SMTP TLS report ID {report_id} from "
|
||||
f" {org_name} with a date range of "
|
||||
f"{begin_date_human} UTC to "
|
||||
f"{end_date_human} UTC already "
|
||||
"exists in OpenSearch")
|
||||
|
||||
index = "smtp_tls"
|
||||
if index_suffix:
|
||||
index = "{0}_{1}".format(index, index_suffix)
|
||||
index = "{0}-{1}".format(index, index_date)
|
||||
index_settings = dict(number_of_shards=number_of_shards,
|
||||
number_of_replicas=number_of_replicas)
|
||||
|
||||
smtp_tls_doc = _SMTPTLSFailureReportDoc(
|
||||
organization_name=report["organization_name"],
|
||||
date_range=[report["date_begin"], report["date_end"]],
|
||||
date_begin=report["date_begin"],
|
||||
date_end=report["date_end"],
|
||||
contact_info=report["contact_info"],
|
||||
report_id=report["report_id"]
|
||||
)
|
||||
|
||||
for policy in report['policies']:
|
||||
policy_strings = None
|
||||
mx_host_patterns = None
|
||||
if "policy_strings" in policy:
|
||||
policy_strings = policy["policy_strings"]
|
||||
if "mx_host_patterns" in policy:
|
||||
mx_host_patterns = policy["mx_host_patterns"]
|
||||
policy_doc = _SMTPTLSPolicyDoc(
|
||||
policy_domain=policy["policy_domain"],
|
||||
policy_type=policy["policy_type"],
|
||||
policy_string=policy_strings,
|
||||
mx_host_patterns=mx_host_patterns
|
||||
)
|
||||
if "failure_details" in policy:
|
||||
failure_details = policy["failure_details"]
|
||||
receiving_mx_hostname = None
|
||||
additional_information_uri = None
|
||||
failure_reason_code = None
|
||||
if "receiving_mx_hostname" in failure_details:
|
||||
receiving_mx_hostname = failure_details[
|
||||
"receiving_mx_hostname"]
|
||||
if "additional_information_uri" in failure_details:
|
||||
additional_information_uri = failure_details[
|
||||
"additional_information_uri"]
|
||||
if "failure_reason_code" in failure_details:
|
||||
failure_reason_code = failure_details["failure_reason_code"]
|
||||
policy_doc.add_failure_details(
|
||||
result_type=failure_details["result_type"],
|
||||
ip_address=failure_details["ip_address"],
|
||||
receiving_ip=failure_details["receiving_ip"],
|
||||
receiving_mx_helo=failure_details["receiving_mx_helo"],
|
||||
failed_session_count=failure_details["failed_session_count"],
|
||||
receiving_mx_hostname=receiving_mx_hostname,
|
||||
additional_information_uri=additional_information_uri,
|
||||
failure_reason_code=failure_reason_code
|
||||
)
|
||||
smtp_tls_doc.policies.append(policy_doc)
|
||||
|
||||
create_indexes([index], index_settings)
|
||||
smtp_tls_doc.meta.index = index
|
||||
|
||||
try:
|
||||
smtp_tls_doc.save()
|
||||
except Exception as e:
|
||||
raise OpenSearchError(
|
||||
"OpenSearch error: {0}".format(e.__str__()))
|
||||
@@ -48,6 +48,7 @@ dependencies = [
|
||||
"lxml>=4.4.0",
|
||||
"mailsuite>=1.6.1",
|
||||
"msgraph-core==0.2.2",
|
||||
"opensearch-py>=2.4.2,<=3.0.0",
|
||||
"publicsuffixlist>=0.10.0",
|
||||
"requests>=2.22.0",
|
||||
"tqdm>=4.31.1",
|
||||
|
||||
@@ -11,6 +11,7 @@ imapclient>=2.1.0
|
||||
dateparser>=1.1.1
|
||||
elasticsearch<7.14.0
|
||||
elasticsearch-dsl>=7.4.0
|
||||
opensearch-py>=2.4.2,<=3.0.0
|
||||
kafka-python>=1.4.4
|
||||
mailsuite>=1.6.1
|
||||
nose>=1.3.7
|
||||
|
||||
Reference in New Issue
Block a user