From ebc6a5571510a48313d79b6e63f98c54828408df Mon Sep 17 00:00:00 2001 From: Sean Whalen <44679+seanthegeek@users.noreply.github.com> Date: Fri, 12 Jun 2026 20:25:35 -0400 Subject: [PATCH] Switch from kafka-python-ng to kafka-python>=2.3.2 (#795) (#796) kafka-python-ng is archived and vulnerable to CVE-2026-10142 and CVE-2026-10143, both fixed in upstream kafka-python 2.3.2. kafka-python 3.0 removed the NoBrokersAvailable exception (a failed producer bootstrap now raises KafkaTimeoutError), so kafkaclient.py imports whichever the installed version provides via a compat shim, keeping the >=2.3.2 range honest for both 2.x and 3.x. Verified against kafka-python 3.0.0 (full test suite) and 2.3.2 (import shim resolution). Co-authored-by: Claude Fable 5 --- CHANGELOG.md | 1 + parsedmarc/kafkaclient.py | 12 ++++++++++-- pyproject.toml | 2 +- tests/test_kafkaclient.py | 6 +++--- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2b5008..1af4f5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - **Output clients are now closed on every exit path** via `atexit` plus a trailing close at the end of `_main()`, fixing a long-standing leak where one-shot CLI runs and graceful shutdowns never flushed Kafka / closed Elasticsearch / S3 / etc. clients. - **Example systemd unit** in `docs/source/usage.md` now sets `KillSignal=SIGTERM` and `TimeoutStopSec=60` so systemd waits long enough for the watcher to drain (keep it above `mailbox_check_timeout`). - Switch the Kafka client dependency from `kafka-python-ng` back to `kafka-python>=2.3.2` ([#795](https://github.com/domainaware/parsedmarc/issues/795)). `kafka-python-ng` was a fork created while `kafka-python` was unmaintained; upstream `kafka-python` is active again, and the now-archived fork is vulnerable to [CVE-2026-10142](https://nvd.nist.gov/vuln/detail/CVE-2026-10142) and [CVE-2026-10143](https://nvd.nist.gov/vuln/detail/CVE-2026-10143), both fixed in `kafka-python` 2.3.2. Both packages install the same `kafka` module, so if you are upgrading an existing environment in place with `pip`, run `pip uninstall kafka-python-ng` before upgrading parsedmarc so the two distributions don't conflict with each other's files. + - parsedmarc is compatible with both `kafka-python` 2.3.2+ and 3.x: `kafka-python` 3.0 removed the `NoBrokersAvailable` exception (a failed bootstrap now raises `KafkaTimeoutError`), and parsedmarc handles whichever the installed version provides. ## 10.0.4 diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index b15dfb5..9ec84d3 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -7,7 +7,15 @@ from ssl import SSLContext, create_default_context from typing import Any, Optional, Union from kafka import KafkaProducer -from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError +from kafka.errors import UnknownTopicOrPartitionError + +try: + # kafka-python < 3.0 raises this when the producer cannot bootstrap + from kafka.errors import NoBrokersAvailable as _BootstrapError +except ImportError: + # kafka-python >= 3.0 removed NoBrokersAvailable; a failed bootstrap + # raises KafkaTimeoutError instead + from kafka.errors import KafkaTimeoutError as _BootstrapError from parsedmarc import __version__ from parsedmarc.log import logger @@ -59,7 +67,7 @@ class KafkaClient(object): config["sasl_plain_password"] = password or "" try: self.producer = KafkaProducer(**config) - except NoBrokersAvailable: + except _BootstrapError: raise KafkaError("No Kafka brokers available") def close(self): diff --git a/pyproject.toml b/pyproject.toml index 863873a..89c099c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "elasticsearch-dsl==7.4.0", "elasticsearch<7.14.0", "expiringdict>=1.1.4", - "kafka-python-ng>=2.2.2", + "kafka-python>=2.3.2", "lxml>=4.4.0", "mailsuite[gmail,msgraph]>=2.2.1", "maxminddb>=2.0.0", diff --git a/tests/test_kafkaclient.py b/tests/test_kafkaclient.py index 6b0310a..82b4dca 100644 --- a/tests/test_kafkaclient.py +++ b/tests/test_kafkaclient.py @@ -4,9 +4,9 @@ import json import unittest from unittest.mock import MagicMock, patch -from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError +from kafka.errors import UnknownTopicOrPartitionError -from parsedmarc.kafkaclient import KafkaClient, KafkaError +from parsedmarc.kafkaclient import KafkaClient, KafkaError, _BootstrapError def _aggregate_report(): @@ -85,7 +85,7 @@ class TestKafkaClientInit(unittest.TestCase): def test_init_no_brokers_available_raises_kafka_error(self): with patch( "parsedmarc.kafkaclient.KafkaProducer", - side_effect=NoBrokersAvailable(), + side_effect=_BootstrapError(), ): with self.assertRaises(KafkaError) as ctx: KafkaClient(kafka_hosts=["unreachable:9092"])