Merge branch 'master' into imap_batch_size

This commit is contained in:
Sean Whalen
2021-06-19 11:40:36 -04:00
committed by GitHub
12 changed files with 396 additions and 105 deletions
+10 -3
View File
@@ -128,11 +128,15 @@ For example
token = HECTokenGoesHere
index = email
[s3]
bucket = my-bucket
path = parsedmarc
The full set of configuration options are:
- ``general``
- ``save_aggregate`` - bool: Save aggregate report data to the Elasticsearch and/or Splunk
- ``save_forensic`` - bool: Save forensic report data to the Elasticsearch and/or Splunk
- ``save_aggregate`` - bool: Save aggregate report data to Elasticsearch, Splunk and/or S3
- ``save_forensic`` - bool: Save forensic report data to Elasticsearch, Splunk and/or S3
- ``strip_attachment_payloads`` - bool: Remove attachment payloads from results
- ``output`` - str: Directory to place JSON and CSV files in
- ``offline`` - bool: Do not use online queries for geolocation or DNS
@@ -145,7 +149,7 @@ The full set of configuration options are:
- ``chunk_size`` - int: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of files
- ``imap``
- ``host`` - str: The IMAP server hostname or IP address
- ``port`` - int: The IMAP server port (Default: 993)
- ``port`` - int: The IMAP server port (Default: 993) If your Hoster publishes another port, still try 993. Otherwise Error:"wrong SSL version"
- ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True)
- ``skip_certificate_verification`` - bool: Skip certificate verification (not recommended)
- ``user`` - str: The IMAP user
@@ -192,6 +196,9 @@ The full set of configuration options are:
- ``subject`` - str: The Subject header to use in the email (Default: parsedmarc report)
- ``attachment`` - str: The ZIP attachment filenames
- ``message`` - str: The email message (Default: Please see the attached parsedmarc report.)
- ``s3``
- ``bucket`` - str: The S3 bucket name
- ``path`` - int: The path to upload reports to (Default: /)
.. warning::
+4
View File
@@ -18,3 +18,7 @@ ssl = False
url = https://splunkhec.example.com
token = HECTokenGoesHere
index = email
[s3]
bucket = my-bucket
path = parsedmarc
+9 -3
View File
@@ -132,11 +132,15 @@ For example
token = HECTokenGoesHere
index = email
[s3]
bucket = my-bucket
path = parsedmarc
The full set of configuration options are:
- ``general``
- ``save_aggregate`` - bool: Save aggregate report data to the Elasticsearch and/or Splunk
- ``save_forensic`` - bool: Save forensic report data to the Elasticsearch and/or Splunk
- ``save_aggregate`` - bool: Save aggregate report data to the Elasticsearch, Splunk and/or S3
- ``save_forensic`` - bool: Save forensic report data to the Elasticsearch, Splunk and/or S3
- ``strip_attachment_payloads`` - bool: Remove attachment payloads from results
- ``output`` - str: Directory to place JSON and CSV files in
- ``offline`` - bool: Do not use online queries for geolocation or DNS
@@ -201,7 +205,9 @@ The full set of configuration options are:
- ``subject`` - str: The Subject header to use in the email (Default: parsedmarc report)
- ``attachment`` - str: The ZIP attachment filenames
- ``message`` - str: The email message (Default: Please see the attached parsedmarc report.)
- ``s3``
- ``bucket`` - str: The S3 bucket name
- ``path`` - int: The path to upload reports to (Default: /)
.. warning::
+92 -54
View File
@@ -5,19 +5,19 @@
"type": "datasource",
"id": "elasticsearch",
"name": "Elasticsearch",
"version": "1.0.0"
"version": "7.11.2"
},
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "7.1.1"
"version": "7.4.5"
},
{
"type": "panel",
"id": "grafana-piechart-panel",
"name": "Pie Chart",
"version": "1.5.0"
"version": "1.6.1"
},
{
"type": "panel",
@@ -47,7 +47,7 @@
"type": "panel",
"id": "text",
"name": "Text",
"version": "7.1.0"
"version": "7.4.5"
}
],
"annotations": {
@@ -68,7 +68,7 @@
"gnetId": null,
"graphTooltip": 0,
"id": null,
"iteration": 1596560916058,
"iteration": 1616327630073,
"links": [],
"panels": [
{
@@ -460,14 +460,17 @@
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": true,
"stack": false,
"steppedLine": false,
"targets": [
{
@@ -604,8 +607,11 @@
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
@@ -741,8 +747,11 @@
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
@@ -880,8 +889,11 @@
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
@@ -1019,8 +1031,11 @@
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
@@ -1156,8 +1171,11 @@
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
@@ -1264,7 +1282,6 @@
"value": "null"
}
],
"nullValueMode": "connected",
"thresholds": {
"mode": "absolute",
"steps": [
@@ -1299,9 +1316,10 @@
"fields": "",
"values": false
},
"text": {},
"textMode": "value_and_name"
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -1345,7 +1363,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [
{
@@ -1418,7 +1437,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -1476,7 +1495,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -1553,7 +1573,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -1658,7 +1678,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -1741,7 +1762,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -1897,7 +1918,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -1974,7 +1996,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -2032,7 +2054,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -2202,7 +2225,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -2427,7 +2450,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [
{
@@ -2610,7 +2634,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -2784,7 +2808,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -2904,7 +2929,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -3007,7 +3032,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -3116,7 +3142,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -3234,7 +3260,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -3347,7 +3374,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -3496,7 +3523,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -3517,16 +3545,12 @@
{
"matcher": {
"id": "byName",
"options": "Arrival_Date(UTC)"
"options": "Arrival Date (UTC)"
},
"properties": [
{
"id": "unit",
"value": "dateTimeAsIso"
},
{
"id": "custom.width",
"value": 147
"value": "dateTimeAsSystem"
}
]
},
@@ -3592,14 +3616,14 @@
"showHeader": true,
"sortBy": []
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
{
"$$hashKey": "object:340",
"fake": true,
"field": "arrival_date",
"field": "Arrival Date (UTC)",
"id": "6",
"settings": {
"interval": "auto",
@@ -3744,7 +3768,7 @@
"indexByName": {},
"renameByName": {
"Count": "Count",
"arrival_date": "Arrival_Date(UTC)",
"arrival_date_utc": "Arrival_Date(UTC)",
"auth_failure.keyword": "AuthFailure",
"authentication_results.keyword": "Auth Results",
"delivery_results.keyword": "DeliveryResult",
@@ -3862,7 +3886,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [
{
@@ -3942,7 +3967,7 @@
"showHeader": true,
"sortBy": []
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -3999,7 +4024,8 @@
"fieldConfig": {
"defaults": {
"custom": {
"align": null
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
@@ -4094,7 +4120,7 @@
}
]
},
"pluginVersion": "7.1.1",
"pluginVersion": "7.4.5",
"targets": [
{
"bucketAggs": [
@@ -4193,8 +4219,8 @@
"type": "table"
}
],
"refresh": false,
"schemaVersion": 26,
"refresh": "10s",
"schemaVersion": 27,
"style": "dark",
"tags": [
"DKIM",
@@ -4207,9 +4233,11 @@
{
"current": {
"selected": false,
"text": "Elasticsearch-dmarc-ag",
"value": "Elasticsearch-dmarc-ag"
"text": "dmarc-ag",
"value": "dmarc-ag"
},
"description": null,
"error": null,
"hide": 2,
"includeAll": false,
"label": "Datasource: Aggregate",
@@ -4225,9 +4253,11 @@
{
"current": {
"selected": false,
"text": "Elasticsearch-dmarc-fo",
"value": "Elasticsearch-dmarc-fo"
"text": "dmarc-fo",
"value": "dmarc-fo"
},
"description": null,
"error": null,
"hide": 2,
"includeAll": false,
"label": "Datasource: Forensic",
@@ -4242,9 +4272,15 @@
},
{
"allValue": null,
"current": {},
"current": {
"selected": false,
"text": "All",
"value": "$__all"
},
"datasource": "$datasourceag",
"definition": "{\"find\":\"terms\",\"field\":\"header_from.keyword\"}",
"description": null,
"error": null,
"hide": 0,
"includeAll": true,
"label": "From Domain",
@@ -4267,10 +4303,12 @@
"auto_count": 30,
"auto_min": "10s",
"current": {
"selected": true,
"selected": false,
"text": "1d",
"value": "1d"
},
"description": null,
"error": null,
"hide": 2,
"label": "Interval",
"name": "interval",
+4 -4
View File
File diff suppressed because one or more lines are too long
+59 -22
View File
@@ -8,6 +8,7 @@ import shutil
import xml.parsers.expat as expat
import json
from datetime import datetime
from time import sleep
from collections import OrderedDict
from io import BytesIO, StringIO
from gzip import GzipFile
@@ -203,7 +204,7 @@ def _parse_report_record(record, offline=False, nameservers=None,
def parse_aggregate_report_xml(xml, offline=False, nameservers=None,
timeout=2.0, parallel=False):
timeout=2.0, parallel=False, server=None):
"""Parses a DMARC XML report string and returns a consistent OrderedDict
Args:
@@ -213,6 +214,7 @@ def parse_aggregate_report_xml(xml, offline=False, nameservers=None,
(Cloudflare's public DNS resolvers by default)
timeout (float): Sets the DNS timeout in seconds
parallel (bool): Parallel processing
server (IMAPClient): Connection object
Returns:
OrderedDict: The parsed aggregate DMARC report
@@ -225,7 +227,8 @@ def parse_aggregate_report_xml(xml, offline=False, nameservers=None,
errors.append("Invalid XML: {0}".format(e.__str__()))
tree = etree.parse(BytesIO(xml.encode('utf-8')),
etree.XMLParser(recover=True))
xml = etree.tostring(tree).decode('utf-8')
s = etree.tostring(tree)
xml = '' if s is None else s.decode('utf-8')
try:
# Replace XML header (sometimes they are invalid)
@@ -304,8 +307,13 @@ def parse_aggregate_report_xml(xml, offline=False, nameservers=None,
new_report["policy_published"] = new_policy_published
if type(report["record"]) == list:
for record in report["record"]:
report_record = _parse_report_record(record,
for i in range(len(report["record"])):
if server is not None and i > 0 and i % 20 == 0:
logger.debug("Sending noop cmd")
server.noop()
logger.debug("Processed {0}/{1}".format(
i, len(report["record"])))
report_record = _parse_report_record(report["record"][i],
offline=offline,
nameservers=nameservers,
dns_timeout=timeout,
@@ -385,7 +393,8 @@ def extract_xml(input_):
def parse_aggregate_report_file(_input, offline=False, nameservers=None,
dns_timeout=2.0,
parallel=False):
parallel=False,
server=None):
"""Parses a file at the given path, a file-like object. or bytes as a
aggregate DMARC report
@@ -396,6 +405,7 @@ def parse_aggregate_report_file(_input, offline=False, nameservers=None,
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
parallel (bool): Parallel processing
server (IMAPClient): Connection object
Returns:
OrderedDict: The parsed DMARC aggregate report
@@ -406,7 +416,8 @@ def parse_aggregate_report_file(_input, offline=False, nameservers=None,
offline=offline,
nameservers=nameservers,
timeout=dns_timeout,
parallel=parallel)
parallel=parallel,
server=server)
def parsed_aggregate_reports_to_csv_rows(reports):
@@ -738,7 +749,7 @@ def parsed_forensic_reports_to_csv(reports):
def parse_report_email(input_, offline=False, nameservers=None,
dns_timeout=2.0, strip_attachment_payloads=False,
parallel=False):
parallel=False, server=None):
"""
Parses a DMARC report from an email
@@ -750,6 +761,7 @@ def parse_report_email(input_, offline=False, nameservers=None,
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
parallel (bool): Parallel processing
server (IMAPClient): Connection object
Returns:
OrderedDict:
@@ -776,6 +788,8 @@ def parse_report_email(input_, offline=False, nameservers=None,
subject = None
feedback_report = None
sample = None
if "From" in msg_headers:
logger.info("Parsing mail from {0}".format(msg_headers["From"]))
if "Subject" in msg_headers:
subject = msg_headers["Subject"]
for part in msg.walk():
@@ -813,7 +827,8 @@ def parse_report_email(input_, offline=False, nameservers=None,
offline=offline,
nameservers=ns,
dns_timeout=dns_timeout,
parallel=parallel)
parallel=parallel,
server=server)
result = OrderedDict([("report_type", "aggregate"),
("report", aggregate_report)])
return result
@@ -863,7 +878,7 @@ def parse_report_email(input_, offline=False, nameservers=None,
def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
strip_attachment_payloads=False,
offline=False, parallel=False):
offline=False, parallel=False, server=None):
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
@@ -876,6 +891,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
forensic report results
offline (bool): Do not make online queries for geolocation or DNS
parallel (bool): Parallel processing
server (IMAPClient): Connection object
Returns:
OrderedDict: The parsed DMARC report
@@ -895,7 +911,8 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
offline=offline,
nameservers=nameservers,
dns_timeout=dns_timeout,
parallel=parallel)
parallel=parallel,
server=server)
results = OrderedDict([("report_type", "aggregate"),
("report", report)])
except InvalidAggregateReport:
@@ -906,7 +923,8 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa,
parallel=parallel)
parallel=parallel,
server=server)
except InvalidDMARCReport:
raise InvalidDMARCReport("Not a valid aggregate or forensic "
"report")
@@ -943,7 +961,7 @@ def get_dmarc_reports_from_mbox(input_, nameservers=None, dns_timeout=2.0,
input_))
for i in range(len(message_keys)):
message_key = message_keys[i]
logger.debug("Processing message {0} of {1}".format(
logger.info("Processing message {0} of {1}".format(
i+1, total_messages
))
msg_content = mbox.get_string(message_key)
@@ -1062,10 +1080,11 @@ def get_dmarc_reports_from_inbox(connection=None,
max_retries=max_retries,
initial_folder=reports_folder)
server.create_folder(archive_folder)
server.create_folder(aggregate_reports_folder)
server.create_folder(forensic_reports_folder)
server.create_folder(invalid_reports_folder)
if not test:
server.create_folder(archive_folder)
server.create_folder(aggregate_reports_folder)
server.create_folder(forensic_reports_folder)
server.create_folder(invalid_reports_folder)
messages = server.search()
total_messages = len(messages)
@@ -1091,7 +1110,8 @@ def get_dmarc_reports_from_inbox(connection=None,
nameservers=nameservers,
dns_timeout=dns_timeout,
offline=offline,
strip_attachment_payloads=sa)
strip_attachment_payloads=sa,
server=server)
if parsed_email["report_type"] == "aggregate":
aggregate_reports.append(parsed_email["report"])
aggregate_report_msg_uids.append(msg_uid)
@@ -1246,9 +1266,18 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
idle_timeout=idle_timeout)
except (timeout, IMAPClientError):
logger.warning("IMAP connection timeout. Reconnecting...")
sleep(5)
except Exception as e:
logger.warning("IMAP connection error. {0}. "
"Reconnecting...".format(e))
sleep(5)
def save_output(results, output_directory="output"):
def save_output(results, output_directory="output",
output_json_aggregate="aggregate.json",
output_json_forensic="forensic.json",
output_csv_aggregate="aggregate.csv",
output_csv_forensic="forensic.csv"):
"""
Save report data in the given directory
@@ -1266,22 +1295,30 @@ def save_output(results, output_directory="output"):
else:
os.makedirs(output_directory)
with open("{0}".format(os.path.join(output_directory, "aggregate.json")),
with open("{0}"
.format(os.path.join(output_directory,
output_json_aggregate)),
"w", newline="\n", encoding="utf-8") as agg_json:
agg_json.write(json.dumps(aggregate_reports, ensure_ascii=False,
indent=2))
with open("{0}".format(os.path.join(output_directory, "aggregate.csv")),
with open("{0}"
.format(os.path.join(output_directory,
output_csv_aggregate)),
"w", newline="\n", encoding="utf-8") as agg_csv:
csv = parsed_aggregate_reports_to_csv(aggregate_reports)
agg_csv.write(csv)
with open("{0}".format(os.path.join(output_directory, "forensic.json")),
with open("{0}"
.format(os.path.join(output_directory,
output_json_forensic)),
"w", newline="\n", encoding="utf-8") as for_json:
for_json.write(json.dumps(forensic_reports, ensure_ascii=False,
indent=2))
with open("{0}".format(os.path.join(output_directory, "forensic.csv")),
with open("{0}"
.format(os.path.join(output_directory,
output_csv_forensic)),
"w", newline="\n", encoding="utf-8") as for_csv:
csv = parsed_forensic_reports_to_csv(forensic_reports)
for_csv.write(csv)
+63 -3
View File
@@ -19,7 +19,7 @@ from tqdm import tqdm
from parsedmarc import get_dmarc_reports_from_inbox, watch_inbox, \
parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \
splunk, save_output, email_results, ParserError, __version__, \
InvalidDMARCReport
InvalidDMARCReport, s3
from parsedmarc.utils import is_mbox
logger = logging.getLogger("parsedmarc")
@@ -79,6 +79,14 @@ def _main():
)
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))
if opts.s3_bucket:
try:
s3_client = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
if opts.save_aggregate:
for report in reports_["aggregate_reports"]:
try:
@@ -104,6 +112,11 @@ def _main():
except Exception as error_:
logger.error("Kafka Error: {0}".format(
error_.__str__()))
try:
if opts.s3_bucket:
s3_client.save_aggregate_report_to_s3(report)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
if opts.hec:
try:
aggregate_reports_ = reports_["aggregate_reports"]
@@ -138,6 +151,11 @@ def _main():
except Exception as error_:
logger.error("Kafka Error: {0}".format(
error_.__str__()))
try:
if opts.s3_bucket:
s3_client.save_forensic_report_to_s3(report)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
if opts.hec:
try:
forensic_reports_ = reports_["forensic_reports"]
@@ -160,6 +178,18 @@ def _main():
help=strip_attachment_help, action="store_true")
arg_parser.add_argument("-o", "--output",
help="write output files to the given directory")
arg_parser.add_argument("--output-json-aggregate",
help="output aggregate JSON file",
default="aggregate.json")
arg_parser.add_argument("--output-json-forensic",
help="output forensic JSON file",
default="forensic.json")
arg_parser.add_argument("--output-csv-aggregate",
help="output aggregate CSV file",
default="aggregate.csv")
arg_parser.add_argument("--output-csv-forensic",
help="output forensic CSV file",
default="forensic.csv")
arg_parser.add_argument("-n", "--nameservers", nargs="+",
help="nameservers to query")
arg_parser.add_argument("-t", "--dns_timeout",
@@ -185,11 +215,16 @@ def _main():
forensic_reports = []
args = arg_parser.parse_args()
opts = Namespace(file_path=args.file_path,
config_file=args.config_file,
offline=args.offline,
strip_attachment_payloads=args.strip_attachment_payloads,
output=args.output,
output_json_aggregate=args.output_json_aggregate,
output_json_forensic=args.output_json_forensic,
output_csv_aggregate=args.output_csv_aggregate,
output_csv_forensic=args.output_csv_forensic,
nameservers=args.nameservers,
silent=args.silent,
dns_timeout=args.dns_timeout,
@@ -242,6 +277,8 @@ def _main():
smtp_to=[],
smtp_subject="parsedmarc report",
smtp_message="Please see the attached DMARC results.",
s3_bucket=None,
s3_path=None,
log_file=args.log_file,
n_procs=1,
chunk_size=1
@@ -474,6 +511,22 @@ def _main():
opts.smtp_attachment = smtp_config["attachment"]
if "message" in smtp_config:
opts.smtp_message = smtp_config["message"]
if "s3" in config.sections():
s3_config = config["s3"]
if "bucket" in s3_config:
opts.s3_bucket = s3_config["bucket"]
else:
logger.critical("bucket setting missing from the "
"s3 config section")
exit(-1)
if "path" in s3_config:
opts.s3_path = s3_config["path"]
if opts.s3_path.startswith("/"):
opts.s3_path = opts.s3_path[1:]
if opts.s3_path.endswith("/"):
opts.s3_path = opts.s3_path[:-1]
else:
opts.s3_path = ""
logging.basicConfig(level=logging.WARNING)
logger.setLevel(logging.WARNING)
@@ -495,6 +548,8 @@ def _main():
logger.error("You must supply input files, or an IMAP configuration")
exit(1)
logger.info("Starting dmarcparse")
if opts.save_aggregate or opts.save_forensic:
try:
if opts.elasticsearch_hosts:
@@ -633,7 +688,11 @@ def _main():
("forensic_reports", forensic_reports)])
if opts.output:
save_output(results, output_directory=opts.output)
save_output(results, output_directory=opts.output,
output_json_aggregate=opts.output_json_aggregate,
output_json_forensic=opts.output_json_forensic,
output_csv_aggregate=opts.output_csv_aggregate,
output_csv_forensic=opts.output_csv_forensic)
process_reports(results)
@@ -678,7 +737,8 @@ def _main():
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size
)
offline=opts.offline,
strip_attachment_payloads=sa)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
exit(1)
+16 -10
View File
@@ -295,16 +295,16 @@ def save_aggregate_report_to_elasticsearch(aggregate_report,
Raises:
AlreadySaved
"""
logger.debug("Saving aggregate report to Elasticsearch")
logger.info("Saving aggregate report to Elasticsearch")
aggregate_report = aggregate_report.copy()
metadata = aggregate_report["report_metadata"]
org_name = metadata["org_name"]
report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"])
end_date = human_timestamp_to_datetime(metadata["end_date"])
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%S")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%S")
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
@@ -317,10 +317,13 @@ def save_aggregate_report_to_elasticsearch(aggregate_report,
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_range=begin_date)))
end_date_query = Q(dict(match=dict(date_range=end_date)))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
search = Search(index="dmarc_aggregate*")
if index_suffix is not None:
search = Search(index="dmarc_aggregate_{0}*".format(index_suffix))
else:
search = Search(index="dmarc_aggregate*")
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
@@ -423,7 +426,7 @@ def save_forensic_report_to_elasticsearch(forensic_report,
AlreadySaved
"""
logger.debug("Saving forensic report to Elasticsearch")
logger.info("Saving forensic report to Elasticsearch")
forensic_report = forensic_report.copy()
sample_date = None
if forensic_report["parsed_sample"]["date"] is not None:
@@ -437,7 +440,10 @@ def save_forensic_report_to_elasticsearch(forensic_report,
arrival_date_human = forensic_report["arrival_date_utc"]
arrival_date = human_timestamp_to_datetime(arrival_date_human)
search = Search(index="dmarc_forensic*")
if index_suffix is not None:
search = Search(index="dmarc_forensic_{0}*".format(index_suffix))
else:
search = Search(index="dmarc_forensic*")
arrival_query = {"match": {"arrival_date": arrival_date}}
q = Q(arrival_query)
+68
View File
@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
import logging
import json
import boto3
from parsedmarc.utils import human_timestamp_to_datetime
logger = logging.getLogger("parsedmarc")
class S3Client(object):
"""A client for a Amazon S3"""
def __init__(self, bucket_name, bucket_path):
"""
Initializes the S3Client
Args:
bucket_name (str): The S3 Bucket
bucket_path (str): The path to save reports
"""
self.bucket_name = bucket_name
self.bucket_path = bucket_path
self.metadata_keys = [
"org_name",
"org_email",
"report_id",
"begin_date",
"end_date",
]
self.s3 = boto3.resource('s3')
self.bucket = self.s3.Bucket(self.bucket_name)
def save_aggregate_report_to_s3(self, report):
self.save_report_to_s3(report, 'aggregate')
def save_forensic_report_to_s3(self, report):
self.save_report_to_s3(report, 'forensic')
def save_report_to_s3(self, report, report_type):
report_date = human_timestamp_to_datetime(
report["report_metadata"]["begin_date"]
)
report_id = report["report_metadata"]["report_id"]
path_template = "{0}/{1}/year={2}/month={3:02d}/day={4:02d}/{5}.json"
object_path = path_template.format(
self.bucket_path,
report_type,
report_date.year,
report_date.month,
report_date.day,
report_id
)
logger.debug("Saving {0} report to s3://{1}/{2}".format(
report_type,
self.bucket_name,
object_path))
object_metadata = {
k: v
for k, v in report["report_metadata"].items()
if k in self.metadata_keys
}
self.bucket.put_object(
Body=json.dumps(report),
Key=object_path,
Metadata=object_metadata
)
+1
View File
@@ -27,3 +27,4 @@ sphinx_rtd_theme>=0.4.3
wheel>=0.33.6
codecov>=2.0.15
lxml>=4.4.0
boto3>=1.16.63
+2 -1
View File
@@ -98,7 +98,8 @@ setup(
'elasticsearch-dsl>=7.2.0,<8.0.0',
'kafka-python>=1.4.4',
'tqdm>=4.31.1',
'lxml>=4.4.0'
'lxml>=4.4.0',
'boto3>=1.16.63'
],
entry_points={
+68 -5
View File
@@ -1,9 +1,53 @@
=================
Splunk dashboards
=================
===================
Splunk Installation
===================
Setup guide
-----------
Install Splunk for use with Docker
----------------------------------
Download latest Splunk image::
docker pull splunk/splunk:latest
Run Splunk with Docker
----------------------
Listen on all network interfaces::
docker run -d -p 8000:8000 -p 8088:8088 -e "SPLUNK_START_ARGS=--accept-license" -e "SPLUNK_PASSWORD=password1234" -e "SPLUNK_HEC_TOKEN=hec-token-1234" --name splunk splunk/splunk:latest
Listen on localhost for use with reverse proxy with base URL ``/splunk``::
docker run -d -p 127.0.0.1:8000:8000 -p 127.0.0.1:8088:8088 -e "SPLUNK_START_ARGS=--accept-license" -e "SPLUNK_PASSWORD=password1234" -e "SPLUNK_HEC_TOKEN=hec-token-1234" -e "SPLUNK_ROOT_ENDPOINT=/splunk" --name splunk splunk/splunk:latest
Set up reverse proxy, e.g. Apache2::
ProxyPass /splunk http://127.0.0.1:8000/splunk
ProxyPassReverse /splunk http://127.0.0.1:8000/splunk
Splunk Configuration
--------------------
Access web UI at http://127.0.0.1:8000 and log in with ``admin:password1234``.
Create App and Index
~~~~~~~~~~~~~~~~~~~~
- Settings > Data > Indexes: New Index
- Index name: "email"
- HEC token ``hec-token-1234`` should be already set up.
- Check under Settings > Data > Data inputs: HTTP Event Collector
- Apps > Manage Apps: Create app
- Name: "parsedmarc"
- Folder name: "parsedmarc"
Create Dashboards
~~~~~~~~~~~~~~~~~
1. Navigate to the app you want to add the dashboards to, or create a new app called DMARC
2. Click Dashboards
@@ -22,3 +66,22 @@ Setup guide
15. Paste the content of ''dmarc_forensic_dashboard.xml`` into the source editor
16. If the index storing the DMARC data is not named email, replace index="email" accordingly
17. Click Save
==============
Example Config
==============
parsedmarc.ini::
[splunk_hec]
url = https://127.0.0.1:8088/
token = hec-token-1234
index = email
skip_certificate_verification = True
Note that ``skip_certificate_verification = True`` disables security checks.
Run parsedmarc::
python3 -m parsedmarc.cli -c parsedmarc.ini