From bf37ded6880128892467c10b1a9fdc5ac370e5ec Mon Sep 17 00:00:00 2001 From: DVB <104630700+dvbnl@users.noreply.github.com> Date: Thu, 21 May 2026 03:36:19 +0200 Subject: [PATCH] Add support for Elastic Cloud Serverless projects (#770) --- CHANGELOG.md | 4 ++++ docs/source/usage.md | 6 +++++ parsedmarc/cli.py | 6 +++++ parsedmarc/elastic.py | 42 ++++++++++++++++++++++++++++----- tests/test_cli.py | 54 +++++++++++++++++++++++++++++++++++++++++++ tests/test_elastic.py | 48 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 154 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b0bac1c..82f5e30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ ### Enhancements +#### Elastic Cloud Serverless compatibility + +New `[elasticsearch] serverless` config flag (env var `PARSEDMARC_ELASTICSEARCH_SERVERLESS`). Elastic Cloud Serverless manages sharding and replication itself and rejects the `number_of_shards` / `number_of_replicas` index settings with HTTP 400 — previously every write into a Serverless project failed at index-creation time. With the flag set, `create_indexes` strips those two keys from the settings sent to Elasticsearch and passes any other settings (e.g. `refresh_interval`) through unchanged. Non-Serverless deployments are unaffected. + #### Docker-secret support via `_FILE` env vars Any `PARSEDMARC_{SECTION}_{KEY}` environment variable can now also be supplied via a file by appending `_FILE` to its name (e.g. `PARSEDMARC_IMAP_PASSWORD_FILE=/run/secrets/imap_password`). The file's contents (with trailing CR/LF stripped) are used as the value. This is the same convention used by the official Postgres, MariaDB, and Redis container images, so credentials no longer have to appear in plain `environment:` blocks where `docker inspect`, container logs, and `/proc//environ` would expose them. diff --git a/docs/source/usage.md b/docs/source/usage.md index fdceac9..e86f606 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -297,6 +297,12 @@ The full set of configuration options are: creating the index (Default: `1`) - `number_of_replicas` - int: The number of replicas to use when creating the index (Default: `0`) + - `serverless` - bool: Set to `True` when targeting an Elastic Cloud + Serverless project. Serverless manages sharding and replication itself + and rejects the `number_of_shards` / `number_of_replicas` index settings + with HTTP 400. With this flag set, parsedmarc strips those keys from the + settings sent at index creation; any other settings (e.g. + `refresh_interval`) are passed through unchanged (Default: `False`) - `opensearch` - `hosts` - str: A comma separated list of hostnames and ports or URLs (e.g. `127.0.0.1:9200` or diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 9be3fd0..5368ce1 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -732,6 +732,10 @@ def _parse_config(config: ConfigParser, opts): # Since 8.20 if "api_key" in elasticsearch_config: opts.elasticsearch_api_key = elasticsearch_config["api_key"] + if "serverless" in elasticsearch_config: + opts.elasticsearch_serverless = elasticsearch_config.getboolean( + "serverless" + ) if "opensearch" in config: opensearch_config = config["opensearch"] @@ -1235,6 +1239,7 @@ def _init_output_clients(opts): password=opts.elasticsearch_password, api_key=opts.elasticsearch_api_key, timeout=elastic_timeout_value, + serverless=opts.elasticsearch_serverless, ) elastic.migrate_indexes( aggregate_indexes=[es_aggregate_index], @@ -1849,6 +1854,7 @@ def _main(): elasticsearch_username=None, elasticsearch_password=None, elasticsearch_api_key=None, + elasticsearch_serverless=False, opensearch_hosts=None, opensearch_timeout=60, opensearch_number_of_shards=1, diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index d0ba414..c5a52d8 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -31,6 +31,18 @@ class ElasticsearchError(Exception): """Raised when an Elasticsearch error occurs""" +# Mirror of the ``serverless`` flag passed to ``set_hosts``; consulted by +# ``create_indexes`` to strip settings Elastic Cloud Serverless rejects. +# Module-level state is consistent with the existing ``connections.create_connection`` +# global the rest of this module relies on — there is a single default ES +# connection per process. +_SERVERLESS = False + +# Index settings rejected by Elastic Cloud Serverless with HTTP 400. Other +# settings (e.g. ``refresh_interval``) are accepted and pass through. +_SERVERLESS_REJECTED_SETTINGS = frozenset({"number_of_shards", "number_of_replicas"}) + + class _PolicyOverride(InnerDoc): type = Text() comment = Text() @@ -314,6 +326,7 @@ def set_hosts( password: Optional[str] = None, api_key: Optional[str] = None, timeout: float = 60.0, + serverless: bool = False, ): """ Sets the Elasticsearch hosts to use @@ -327,7 +340,14 @@ def set_hosts( password (str): The password to use for authentication api_key (str): The Base64 encoded API key to use for authentication timeout (float): Timeout in seconds + serverless (bool): Target an Elastic Cloud Serverless project. When True, + ``create_indexes`` strips ``number_of_shards`` / ``number_of_replicas`` + from its settings (which Serverless rejects with HTTP 400) and passes + any other settings through unchanged. """ + # Module-global; see the _SERVERLESS comment at the top of the module. + global _SERVERLESS + _SERVERLESS = serverless if not isinstance(hosts, list): hosts = [hosts] conn_params = {"hosts": hosts, "timeout": timeout} @@ -352,18 +372,28 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None): Args: names (list): A list of index names - settings (dict): Index settings - + settings (dict): Index settings. In Serverless mode, keys in + ``_SERVERLESS_REJECTED_SETTINGS`` are filtered out and the + remaining keys are passed through; defaults are skipped entirely. """ + if settings is None: + effective_settings: dict[str, Any] = ( + {} if _SERVERLESS else {"number_of_shards": 1, "number_of_replicas": 0} + ) + elif _SERVERLESS: + effective_settings = { + k: v for k, v in settings.items() if k not in _SERVERLESS_REJECTED_SETTINGS + } + else: + effective_settings = dict(settings) + for name in names: index = Index(name) try: if not index.exists(): logger.debug("Creating Elasticsearch index: {0}".format(name)) - if settings is None: - index.settings(number_of_shards=1, number_of_replicas=0) - else: - index.settings(**settings) + if effective_settings: + index.settings(**effective_settings) index.create() except Exception as e: raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__())) diff --git a/tests/test_cli.py b/tests/test_cli.py index 885b384..03c2154 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2232,6 +2232,60 @@ class TestParseConfigElasticsearch(unittest.TestCase): _parse_config(cp, _opts()) self.assertIn("hosts", str(ctx.exception)) + def test_elasticsearch_serverless_flag(self): + """``[elasticsearch] serverless = true`` flips ``opts.elasticsearch_serverless``.""" + from parsedmarc.cli import _parse_config + + cp = _config_with("elasticsearch", {"hosts": "es:9200", "serverless": "true"}) + opts = _opts() + _parse_config(cp, opts) + self.assertIs(opts.elasticsearch_serverless, True) + + def test_elasticsearch_serverless_passed_to_set_hosts(self): + """End-to-end: a Serverless config reaches ``elastic.set_hosts(serverless=True)``. + + Regression guard: catches anyone who later parses the flag but forgets + to plumb it through to ``set_hosts`` (or vice-versa). + """ + config = """[general] +save_aggregate = true +silent = true + +[imap] +host = imap.example.com +user = test-user +password = test-password + +[elasticsearch] +hosts = localhost +serverless = true +""" + with tempfile.NamedTemporaryFile( + "w", suffix=".ini", delete=False + ) as config_file: + config_file.write(config) + config_path = config_file.name + self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path)) + + with ( + patch("parsedmarc.cli.elastic.migrate_indexes"), + patch("parsedmarc.cli.elastic.set_hosts") as mock_set_hosts, + patch( + "parsedmarc.cli.get_dmarc_reports_from_mailbox", + return_value={ + "aggregate_reports": [], + "failure_reports": [], + "smtp_tls_reports": [], + }, + ), + patch("parsedmarc.cli.IMAPConnection", return_value=object()), + patch.object(sys, "argv", ["parsedmarc", "-c", config_path]), + ): + parsedmarc.cli._main() + + mock_set_hosts.assert_called_once() + self.assertIs(mock_set_hosts.call_args.kwargs.get("serverless"), True) + class TestParseConfigOpenSearch(unittest.TestCase): def test_opensearch_basic(self): diff --git a/tests/test_elastic.py b/tests/test_elastic.py index 1677952..06dec23 100644 --- a/tests/test_elastic.py +++ b/tests/test_elastic.py @@ -320,6 +320,54 @@ class TestCreateIndexes(unittest.TestCase): self.assertIn("cluster down", str(ctx.exception)) +class TestCreateIndexesServerless(unittest.TestCase): + """Serverless mode strips shard/replica keys but keeps everything else. + + Elastic Cloud Serverless rejects ``number_of_shards`` and + ``number_of_replicas`` with HTTP 400. Other settings like + ``refresh_interval`` are accepted and must pass through unchanged. + """ + + def setUp(self): + self._original = elastic_module._SERVERLESS + elastic_module._SERVERLESS = True + + def tearDown(self): + elastic_module._SERVERLESS = self._original + + def test_serverless_default_skips_settings_entirely(self): + with patch("parsedmarc.elastic.Index") as mock_index_cls: + mock_index = mock_index_cls.return_value + mock_index.exists.return_value = False + create_indexes(["idx"]) + mock_index.settings.assert_not_called() + mock_index.create.assert_called_once() + + def test_serverless_filters_rejected_keys_and_passes_others_through(self): + with patch("parsedmarc.elastic.Index") as mock_index_cls: + mock_index = mock_index_cls.return_value + mock_index.exists.return_value = False + create_indexes( + ["idx"], + settings={ + "number_of_shards": 3, + "number_of_replicas": 2, + "refresh_interval": "5s", + }, + ) + mock_index.settings.assert_called_once_with(refresh_interval="5s") + + def test_serverless_skips_settings_when_only_rejected_keys(self): + with patch("parsedmarc.elastic.Index") as mock_index_cls: + mock_index = mock_index_cls.return_value + mock_index.exists.return_value = False + create_indexes( + ["idx"], settings={"number_of_shards": 3, "number_of_replicas": 2} + ) + mock_index.settings.assert_not_called() + mock_index.create.assert_called_once() + + # --------------------------------------------------------------------------- # migrate_indexes # ---------------------------------------------------------------------------