From e0c532c7eb01af15b979b7a3f44e48a774c4e3b4 Mon Sep 17 00:00:00 2001 From: Mike Siegel Date: Thu, 25 Oct 2018 15:38:18 -0400 Subject: [PATCH 1/4] - Moved report metadata and moved report_id, org_email and org_email, org_name up a level in JSON object - Send individual slices of report due to Kafka message size limit being 1MB - Date calculations from ES client to aid in dashboard display --- parsedmarc/__version__.py | 2 +- parsedmarc/kafkaclient.py | 83 +++++++++++++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 18 deletions(-) diff --git a/parsedmarc/__version__.py b/parsedmarc/__version__.py index 5087b1d..28894d1 100644 --- a/parsedmarc/__version__.py +++ b/parsedmarc/__version__.py @@ -2,7 +2,7 @@ import platform -__version__ = "4.3.8" +__version__ = "4.3.9" USER_AGENT = "Mozilla/5.0 ((0 {1})) parsedmarc/{2}".format( platform.system(), diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 590ce4d..923b882 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -5,7 +5,8 @@ import json from kafka import KafkaProducer from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError - +from collections import OrderedDict +from parsedmarc.utils import human_timestamp_to_datetime, timestamp_to_human logger = logging.getLogger("parsedmarc") @@ -24,6 +25,37 @@ class KafkaClient(object): except NoBrokersAvailable: raise KafkaError("No Kafka brokers available") + def strip_metadata(self, report): + """ + Duplicates org_name, org_email and report_id into JSON root + and removes report_metadata key to bring it more inline + with Elastic output. + """ + report['org_name'] = report['report_metadata']['org_name'] + report['org_email'] = report['report_metadata']['org_email'] + report['report_id'] = report['report_metadata']['report_id'] + report.pop('report_metadata') + + return report + + def generate_daterange(self, report): + """ + Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS + based on begin and end dates for easier parsing in Kibana. + + Move to utils to avoid duplication w/ elastic? + """ + + metadata = report["report_metadata"] + begin_date = human_timestamp_to_datetime(metadata["begin_date"]) + end_date = human_timestamp_to_datetime(metadata["end_date"]) + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%S") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%S") + date_range = [begin_date_human, + end_date_human] + logger.debug("date_range is {}".format(date_range)) + return date_range + def save_aggregate_reports_to_kafka(self, aggregate_reports, aggregate_topic): """ @@ -35,30 +67,47 @@ class KafkaClient(object): aggregate_topic (str): The name of the Kafka topic """ - if type(aggregate_reports) == dict: + logger.debug("aggregate_reports_save was called with aggr_report" + "type {}".format(type(aggregate_reports))) + if (type(aggregate_reports) == dict or + type(aggregate_reports) == OrderedDict): aggregate_reports = [aggregate_reports] if len(aggregate_reports) < 1: return - try: - logger.debug("Saving aggregate reports to Kafka") - self.producer.send(aggregate_topic, aggregate_reports) - except UnknownTopicOrPartitionError: - raise KafkaError( - "Kafka error: Unknown topic or partition on broker") - except Exception as e: - raise KafkaError( - "Kafka error: {0}".format(e.__str__())) - try: - self.producer.flush() - except Exception as e: - raise KafkaError( - "Kafka error: {0}".format(e.__str__())) + for report in aggregate_reports: + report['date_range'] = self.generate_daterange(report) + report = self.strip_metadata(report) + + for slice in report['records']: + slice['date_range'] = report['date_range'] + slice['org_name'] = report['org_name'] + slice['org_email'] = report['org_email'] + slice['policy_published'] = report['policy_published'] + slice['report_id'] = report['report_id'] + logger.debug("Sending slice.") + try: + logger.debug("Saving aggregate report to Kafka") + self.producer.send(aggregate_topic, slice) + except UnknownTopicOrPartitionError: + raise KafkaError( + "Kafka error: Unknown topic or partition" + " on broker") + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) + try: + self.producer.flush() + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic): """ - Saves forensic DMARC reports to Kafka + Saves forensic DMARC reports to Kafka, sends individual + records (slices) since Kafka requires messages to be <= 1MB + by default. Args: forensic_reports (list): A list of forensic report dicts From a543cb4e44ea845895be5304f04d7585b7c6788c Mon Sep 17 00:00:00 2001 From: Mike Siegel Date: Mon, 29 Oct 2018 07:48:01 -0400 Subject: [PATCH 2/4] Add T separator in datetime format --- parsedmarc/kafkaclient.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 923b882..40b2700 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -49,8 +49,8 @@ class KafkaClient(object): metadata = report["report_metadata"] begin_date = human_timestamp_to_datetime(metadata["begin_date"]) end_date = human_timestamp_to_datetime(metadata["end_date"]) - begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%S") - end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%S") + begin_date_human = begin_date.strftime("%Y-%m-%dT%H:%M:%S") + end_date_human = end_date.strftime("%Y-%m-%dT%H:%M:%S") date_range = [begin_date_human, end_date_human] logger.debug("date_range is {}".format(date_range)) @@ -67,8 +67,6 @@ class KafkaClient(object): aggregate_topic (str): The name of the Kafka topic """ - logger.debug("aggregate_reports_save was called with aggr_report" - "type {}".format(type(aggregate_reports))) if (type(aggregate_reports) == dict or type(aggregate_reports) == OrderedDict): aggregate_reports = [aggregate_reports] From 0e398f2c8d9192036e57b299d44d7f06d3a06d21 Mon Sep 17 00:00:00 2001 From: Mike Siegel Date: Mon, 29 Oct 2018 07:51:21 -0400 Subject: [PATCH 3/4] removing unused import --- parsedmarc/kafkaclient.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 40b2700..411b4d9 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -6,7 +6,6 @@ import json from kafka import KafkaProducer from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError from collections import OrderedDict -from parsedmarc.utils import human_timestamp_to_datetime, timestamp_to_human logger = logging.getLogger("parsedmarc") From 78e796a97c27c9260393ffdf29696c24d1216683 Mon Sep 17 00:00:00 2001 From: Mike Siegel Date: Mon, 29 Oct 2018 08:08:40 -0400 Subject: [PATCH 4/4] Fixing import --- parsedmarc/kafkaclient.py | 1 + 1 file changed, 1 insertion(+) diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 411b4d9..93d34b1 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -6,6 +6,7 @@ import json from kafka import KafkaProducer from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError from collections import OrderedDict +from parsedmarc.utils import human_timestamp_to_datetime logger = logging.getLogger("parsedmarc")