Merge pull request #30 from mikesiegel/msiegel_kafkafix

A few Kafka client tweaks and changes
This commit is contained in:
Sean Whalen
2018-11-06 17:31:27 -05:00
committed by GitHub
2 changed files with 65 additions and 18 deletions

View File

@@ -2,7 +2,7 @@
import platform
__version__ = "4.3.8"
__version__ = "4.3.9"
USER_AGENT = "Mozilla/5.0 ((0 {1})) parsedmarc/{2}".format(
platform.system(),

View File

@@ -5,7 +5,8 @@ import json
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
from collections import OrderedDict
from parsedmarc.utils import human_timestamp_to_datetime
logger = logging.getLogger("parsedmarc")
@@ -24,6 +25,37 @@ class KafkaClient(object):
except NoBrokersAvailable:
raise KafkaError("No Kafka brokers available")
def strip_metadata(self, report):
"""
Duplicates org_name, org_email and report_id into JSON root
and removes report_metadata key to bring it more inline
with Elastic output.
"""
report['org_name'] = report['report_metadata']['org_name']
report['org_email'] = report['report_metadata']['org_email']
report['report_id'] = report['report_metadata']['report_id']
report.pop('report_metadata')
return report
def generate_daterange(self, report):
"""
Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS
based on begin and end dates for easier parsing in Kibana.
Move to utils to avoid duplication w/ elastic?
"""
metadata = report["report_metadata"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"])
end_date = human_timestamp_to_datetime(metadata["end_date"])
begin_date_human = begin_date.strftime("%Y-%m-%dT%H:%M:%S")
end_date_human = end_date.strftime("%Y-%m-%dT%H:%M:%S")
date_range = [begin_date_human,
end_date_human]
logger.debug("date_range is {}".format(date_range))
return date_range
def save_aggregate_reports_to_kafka(self, aggregate_reports,
aggregate_topic):
"""
@@ -35,30 +67,45 @@ class KafkaClient(object):
aggregate_topic (str): The name of the Kafka topic
"""
if type(aggregate_reports) == dict:
if (type(aggregate_reports) == dict or
type(aggregate_reports) == OrderedDict):
aggregate_reports = [aggregate_reports]
if len(aggregate_reports) < 1:
return
try:
logger.debug("Saving aggregate reports to Kafka")
self.producer.send(aggregate_topic, aggregate_reports)
except UnknownTopicOrPartitionError:
raise KafkaError(
"Kafka error: Unknown topic or partition on broker")
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))
try:
self.producer.flush()
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))
for report in aggregate_reports:
report['date_range'] = self.generate_daterange(report)
report = self.strip_metadata(report)
for slice in report['records']:
slice['date_range'] = report['date_range']
slice['org_name'] = report['org_name']
slice['org_email'] = report['org_email']
slice['policy_published'] = report['policy_published']
slice['report_id'] = report['report_id']
logger.debug("Sending slice.")
try:
logger.debug("Saving aggregate report to Kafka")
self.producer.send(aggregate_topic, slice)
except UnknownTopicOrPartitionError:
raise KafkaError(
"Kafka error: Unknown topic or partition"
" on broker")
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))
try:
self.producer.flush()
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))
def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic):
"""
Saves forensic DMARC reports to Kafka
Saves forensic DMARC reports to Kafka, sends individual
records (slices) since Kafka requires messages to be <= 1MB
by default.
Args:
forensic_reports (list): A list of forensic report dicts