diff --git a/CHANGELOG.md b/CHANGELOG.md index a89de07..f5eaa31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +5.1.0 +----- + +- Add support for TLS/SSL and uerename/password auth to Kafka + 5.0.2 ----- diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index b3797c5..5bf3072 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -38,7 +38,7 @@ from parsedmarc.utils import is_outlook_msg, convert_outlook_msg from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime from parsedmarc.utils import parse_email -__version__ = "5.0.2" +__version__ = "5.1.0" logger = logging.getLogger("parsedmarc") logger.debug("parsedmarc v{0}".format(__version__)) diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index dfb1e1b..49f5ac8 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -29,7 +29,11 @@ def _main(): print(output_str) if args.kafka_hosts: try: - kafka_client = kafkaclient.KafkaClient(args.kafka_hosts) + kafka_client = kafkaclient.KafkaClient( + args.kafka_hosts, + username=args.kafka_username, + password=args.kafka_password + ) except Exception as error_: logger.error("Kafka Error: {0}".format(error_.__str__())) if args.save_aggregate: @@ -151,8 +155,20 @@ def _main(): help="Skip certificate verification for Splunk " "HEC") arg_parser.add_argument("-K", "--kafka-hosts", nargs="*", - help="A list of one or more Kafka hostnames" - " or URLs") + help="A list of one or more Kafka hostnames") + arg_parser.add_argument("--kafka-username", + help=""""An optional Kafka username + Note: For Azure Event hub, this is + literally $ConnectionString""") + arg_parser.add_argument("--kafka-password", + help=""""An optional Kafka password + Note: For Azure Event hub, this is + the Azure-provided connection string""") + arg_parser.add_argument("--kafka-use-ssl", + action="store_true", + help="""Use SSL/TLS to connect to Kafka + (implied when ``--kafka-username or + --kafka-password are provided)""") arg_parser.add_argument("--kafka-aggregate-topic", help="The Kafka topic to publish aggregate " "reports to (Default: dmarc_aggregate)", diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 93d34b1..fccf23c 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -2,6 +2,7 @@ import logging import json +import ssl from kafka import KafkaProducer from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError @@ -16,16 +17,41 @@ class KafkaError(RuntimeError): class KafkaClient(object): - def __init__(self, kafka_hosts): - try: - self.producer = KafkaProducer( - value_serializer=lambda v: json.dumps(v).encode( + def __init__(self, kafka_hosts, use_ssl=False, username=None, + password=None): + """ + Initializes the Kafka client + Args: + kafka_hosts (list): A list of Kafka hostnames + (with optional port numbers) + use_ssl (bool): Use a SSL/TLS connection + username (str): An optional username + password (str): An optional password + + Notes: + ``use_ssl=True`` is implied when a username or password are + supplied. + + When using Azure Event Hubs, the username is literally + ``$ConnectionString``, and the password is the + Azure Event Hub connection string. + """ + config = dict(value_serializer=lambda v: json.dumps(v).encode( 'utf-8'), - bootstrap_servers=kafka_hosts) + bootstrap_servers=kafka_hosts) + if use_ssl or username or password: + config["security_protocol"] = "SSL" + config["ssl_context"] = ssl.create_default_context() + if username or password: + config["sasl_plain_username"] = username or "" + config["sasl_plain_password"] = password or "" + try: + self.producer = KafkaProducer(**config) except NoBrokersAvailable: raise KafkaError("No Kafka brokers available") - def strip_metadata(self, report): + @staticmethod + def strip_metadata(report): """ Duplicates org_name, org_email and report_id into JSON root and removes report_metadata key to bring it more inline @@ -38,7 +64,8 @@ class KafkaClient(object): return report - def generate_daterange(self, report): + @staticmethod + def generate_daterange(report): """ Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS based on begin and end dates for easier parsing in Kibana. diff --git a/setup.py b/setup.py index 27bf054..1b5d3ad 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ from setuptools import setup from codecs import open from os import path -__version__ = "5.0.2" +__version__ = "5.1.0" description = "A Python package and CLI for parsing aggregate and " \ "forensic DMARC reports"