diff --git a/docs/source/usage.md b/docs/source/usage.md index 8b756b3..1622bf5 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -240,6 +240,7 @@ The full set of configuration options are: - `timeout` - float: Timeout in seconds (Default: 60) - `cert_path` - str: Path to a trusted certificates - `index_suffix` - str: A suffix to apply to the index names + - `index_prefix` - str: A prefix to apply to the index names - `monthly_indexes` - bool: Use monthly indexes instead of daily indexes - `number_of_shards` - int: The number of shards to use when creating the index (Default: `1`) @@ -262,6 +263,7 @@ The full set of configuration options are: - `timeout` - float: Timeout in seconds (Default: 60) - `cert_path` - str: Path to a trusted certificates - `index_suffix` - str: A suffix to apply to the index names + - `index_prefix` - str: A prefix to apply to the index names - `monthly_indexes` - bool: Use monthly indexes instead of daily indexes - `number_of_shards` - int: The number of shards to use when creating the index (Default: `1`) diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index d2ebae6..f47fae9 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -92,6 +92,7 @@ def _main(): elastic.save_aggregate_report_to_elasticsearch( report, index_suffix=opts.elasticsearch_index_suffix, + index_prefix=opts.elasticsearch_index_prefix, monthly_indexes=opts.elasticsearch_monthly_indexes, number_of_shards=shards, number_of_replicas=replicas @@ -112,6 +113,7 @@ def _main(): opensearch.save_aggregate_report_to_opensearch( report, index_suffix=opts.opensearch_index_suffix, + index_prefix=opts.opensearch_index_prefix, monthly_indexes=opts.opensearch_monthly_indexes, number_of_shards=shards, number_of_replicas=replicas @@ -163,6 +165,7 @@ def _main(): elastic.save_forensic_report_to_elasticsearch( report, index_suffix=opts.elasticsearch_index_suffix, + index_prefix=opts.elasticsearch_index_prefix, monthly_indexes=opts.elasticsearch_monthly_indexes, number_of_shards=shards, number_of_replicas=replicas) @@ -181,6 +184,7 @@ def _main(): opensearch.save_forensic_report_to_opensearch( report, index_suffix=opts.opensearch_index_suffix, + index_prefix=opts.opensearch_index_prefix, monthly_indexes=opts.opensearch_monthly_indexes, number_of_shards=shards, number_of_replicas=replicas) @@ -230,6 +234,7 @@ def _main(): elastic.save_smtp_tls_report_to_elasticsearch( report, index_suffix=opts.elasticsearch_index_suffix, + index_prefix=opts.elasticsearch_index_prefix, monthly_indexes=opts.elasticsearch_monthly_indexes, number_of_shards=shards, number_of_replicas=replicas) @@ -248,6 +253,7 @@ def _main(): opensearch.save_smtp_tls_report_to_opensearch( report, index_suffix=opts.opensearch_index_suffix, + index_prefix=opts.opensearch_index_prefix, monthly_indexes=opts.opensearch_monthly_indexes, number_of_shards=shards, number_of_replicas=replicas) @@ -429,6 +435,7 @@ def _main(): elasticsearch_number_of_shards=1, elasticsearch_number_of_replicas=0, elasticsearch_index_suffix=None, + elasticsearch_index_prefix=None, elasticsearch_ssl=True, elasticsearch_ssl_cert_path=None, elasticsearch_monthly_indexes=False, @@ -440,6 +447,7 @@ def _main(): opensearch_number_of_shards=1, opensearch_number_of_replicas=0, opensearch_index_suffix=None, + opensearch_index_prefix=None, opensearch_ssl=True, opensearch_ssl_cert_path=None, opensearch_monthly_indexes=False, @@ -750,6 +758,9 @@ def _main(): if "index_suffix" in elasticsearch_config: opts.elasticsearch_index_suffix = elasticsearch_config[ "index_suffix"] + if "index_prefix" in elasticsearch_config: + opts.elasticsearch_index_prefix = elasticsearch_config[ + "index_prefix"] if "monthly_indexes" in elasticsearch_config: monthly = elasticsearch_config.getboolean("monthly_indexes") opts.elasticsearch_monthly_indexes = monthly @@ -792,6 +803,9 @@ def _main(): if "index_suffix" in opensearch_config: opts.opensearch_index_suffix = opensearch_config[ "index_suffix"] + if "index_prefix" in opensearch_config: + opts.opensearch_index_prefix = opensearch_config[ + "index_prefix"] if "monthly_indexes" in opensearch_config: monthly = opensearch_config.getboolean("monthly_indexes") opts.opensearch_monthly_indexes = monthly @@ -1037,6 +1051,15 @@ def _main(): es_smtp_tls_index = "{0}_{1}".format( es_smtp_tls_index, suffix ) + if opts.elasticsearch_index_prefix: + prefix = opts.elasticsearch_index_prefix + es_aggregate_index = "{0}{1}".format( + prefix, es_aggregate_index) + es_forensic_index = "{0}{1}".format( + prefix, es_forensic_index) + es_smtp_tls_index = "{0}{1}".format( + prefix, es_smtp_tls_index + ) elastic.set_hosts(opts.elasticsearch_hosts, opts.elasticsearch_ssl, opts.elasticsearch_ssl_cert_path, @@ -1064,6 +1087,15 @@ def _main(): os_smtp_tls_index = "{0}_{1}".format( os_smtp_tls_index, suffix ) + if opts.opensearch_index_prefix: + prefix = opts.opensearch_index_prefix + os_aggregate_index = "{0}{1}".format( + prefix, os_aggregate_index) + os_forensic_index = "{0}{1}".format( + prefix, os_forensic_index) + os_smtp_tls_index = "{0}{1}".format( + prefix, os_smtp_tls_index + ) opensearch.set_hosts(opts.opensearch_hosts, opts.opensearch_ssl, opts.opensearch_ssl_cert_path, diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 8a63ded..93ce05d 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -350,6 +350,7 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): def save_aggregate_report_to_elasticsearch(aggregate_report, index_suffix=None, + index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0): @@ -359,6 +360,7 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, Args: aggregate_report (OrderedDict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to + index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index number_of_replicas (int): The number of replicas to use in the index @@ -394,9 +396,12 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, end_date_query = Q(dict(match=dict(date_end=end_date))) if index_suffix is not None: - search = Search(index="dmarc_aggregate_{0}*".format(index_suffix)) + search_index = "dmarc_aggregate_{0}*".format(index_suffix) else: - search = Search(index="dmarc_aggregate*") + search_index = "dmarc_aggregate*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) query = org_name_query & report_id_query & domain_query query = query & begin_date_query & end_date_query search.query = query @@ -472,6 +477,9 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, index = "dmarc_aggregate" if index_suffix: index = "{0}_{1}".format(index, index_suffix) + if index_prefix: + index = "{0}{1}".format(index_prefix, index) + index = "{0}-{1}".format(index, index_date) index_settings = dict(number_of_shards=number_of_shards, number_of_replicas=number_of_replicas) @@ -487,6 +495,7 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, def save_forensic_report_to_elasticsearch(forensic_report, index_suffix=None, + index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0): @@ -496,6 +505,7 @@ def save_forensic_report_to_elasticsearch(forensic_report, Args: forensic_report (OrderedDict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to + index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index @@ -521,9 +531,12 @@ def save_forensic_report_to_elasticsearch(forensic_report, arrival_date = human_timestamp_to_datetime(arrival_date_human) if index_suffix is not None: - search = Search(index="dmarc_forensic_{0}*".format(index_suffix)) + search_index = "dmarc_forensic_{0}*".format(index_suffix) else: - search = Search(index="dmarc_forensic*") + search_index = "dmarc_forensic*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) arrival_query = {"match": {"arrival_date": arrival_date}} q = Q(arrival_query) @@ -609,6 +622,8 @@ def save_forensic_report_to_elasticsearch(forensic_report, index = "dmarc_forensic" if index_suffix: index = "{0}_{1}".format(index, index_suffix) + if index_prefix: + index = "{0}{1}".format(index_prefix, index) if monthly_indexes: index_date = arrival_date.strftime("%Y-%m") else: @@ -630,6 +645,7 @@ def save_forensic_report_to_elasticsearch(forensic_report, def save_smtp_tls_report_to_elasticsearch(report, index_suffix=None, + index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0): @@ -639,6 +655,7 @@ def save_smtp_tls_report_to_elasticsearch(report, Args: report (OrderedDict): A parsed SMTP TLS report index_suffix (str): The suffix of the name of the index to save to + index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index number_of_replicas (int): The number of replicas to use in the index @@ -668,9 +685,12 @@ def save_smtp_tls_report_to_elasticsearch(report, end_date_query = Q(dict(match=dict(date_end=end_date))) if index_suffix is not None: - search = Search(index="smtp_tls_{0}*".format(index_suffix)) + search_index = "smtp_tls_{0}*".format(index_suffix) else: - search = Search(index="smtp_tls*") + search_index = "smtp_tls*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) query = org_name_query & report_id_query query = query & begin_date_query & end_date_query search.query = query @@ -691,6 +711,8 @@ def save_smtp_tls_report_to_elasticsearch(report, index = "smtp_tls" if index_suffix: index = "{0}_{1}".format(index, index_suffix) + if index_prefix: + index = "{0}{1}".format(index_prefix, index) index = "{0}-{1}".format(index, index_date) index_settings = dict(number_of_shards=number_of_shards, number_of_replicas=number_of_replicas) diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 5e777ed..f8a7b1e 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -346,6 +346,7 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): def save_aggregate_report_to_opensearch(aggregate_report, index_suffix=None, + index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0): @@ -355,6 +356,7 @@ def save_aggregate_report_to_opensearch(aggregate_report, Args: aggregate_report (OrderedDict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to + index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index number_of_replicas (int): The number of replicas to use in the index @@ -390,9 +392,12 @@ def save_aggregate_report_to_opensearch(aggregate_report, end_date_query = Q(dict(match=dict(date_end=end_date))) if index_suffix is not None: - search = Search(index="dmarc_aggregate_{0}*".format(index_suffix)) + search_index = "dmarc_aggregate_{0}*".format(index_suffix) else: - search = Search(index="dmarc_aggregate*") + search_index = "dmarc_aggregate*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) query = org_name_query & report_id_query & domain_query query = query & begin_date_query & end_date_query search.query = query @@ -468,6 +473,8 @@ def save_aggregate_report_to_opensearch(aggregate_report, index = "dmarc_aggregate" if index_suffix: index = "{0}_{1}".format(index, index_suffix) + if index_prefix: + index = "{0}{1}".format(index_prefix, index) index = "{0}-{1}".format(index, index_date) index_settings = dict(number_of_shards=number_of_shards, number_of_replicas=number_of_replicas) @@ -483,6 +490,7 @@ def save_aggregate_report_to_opensearch(aggregate_report, def save_forensic_report_to_opensearch(forensic_report, index_suffix=None, + index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0): @@ -492,6 +500,7 @@ def save_forensic_report_to_opensearch(forensic_report, Args: forensic_report (OrderedDict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to + index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index @@ -517,9 +526,12 @@ def save_forensic_report_to_opensearch(forensic_report, arrival_date = human_timestamp_to_datetime(arrival_date_human) if index_suffix is not None: - search = Search(index="dmarc_forensic_{0}*".format(index_suffix)) + search_index = "dmarc_forensic_{0}*".format(index_suffix) else: - search = Search(index="dmarc_forensic*") + search_index = "dmarc_forensic*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) arrival_query = {"match": {"arrival_date": arrival_date}} q = Q(arrival_query) @@ -603,6 +615,8 @@ def save_forensic_report_to_opensearch(forensic_report, index = "dmarc_forensic" if index_suffix: index = "{0}_{1}".format(index, index_suffix) + if index_prefix: + index = "{0}{1}".format(index_prefix, index) if monthly_indexes: index_date = arrival_date.strftime("%Y-%m") else: @@ -624,6 +638,7 @@ def save_forensic_report_to_opensearch(forensic_report, def save_smtp_tls_report_to_opensearch(report, index_suffix=None, + index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0): @@ -633,6 +648,7 @@ def save_smtp_tls_report_to_opensearch(report, Args: report (OrderedDict): A parsed SMTP TLS report index_suffix (str): The suffix of the name of the index to save to + index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index number_of_replicas (int): The number of replicas to use in the index @@ -662,9 +678,12 @@ def save_smtp_tls_report_to_opensearch(report, end_date_query = Q(dict(match=dict(date_end=end_date))) if index_suffix is not None: - search = Search(index="smtp_tls_{0}*".format(index_suffix)) + search_index = "smtp_tls_{0}*".format(index_suffix) else: - search = Search(index="smtp_tls") + search_index = "smtp_tls*" + if index_prefix is not None: + search_index = "{0}{1}".format(index_prefix, search_index) + search = Search(index=search_index) query = org_name_query & report_id_query query = query & begin_date_query & end_date_query search.query = query @@ -685,6 +704,8 @@ def save_smtp_tls_report_to_opensearch(report, index = "smtp_tls" if index_suffix: index = "{0}_{1}".format(index, index_suffix) + if index_prefix: + index = "{0}{1}".format(index_prefix, index) index = "{0}-{1}".format(index, index_date) index_settings = dict(number_of_shards=number_of_shards, number_of_replicas=number_of_replicas)