From 55682bfc461434cbd70a5bd3788371c60f9e61e0 Mon Sep 17 00:00:00 2001 From: "Jens W. Klein" Date: Thu, 16 Nov 2023 12:21:52 +0100 Subject: [PATCH] Remove Support for OpensSearch 1.x and ElasticSearch < 8 --- CHANGES.rst | 4 +- README.rst | 4 +- pyproject.toml | 2 +- src/collective/elastic/ingest/__init__.py | 12 +++-- src/collective/elastic/ingest/analysis.py | 48 ++++++++----------- src/collective/elastic/ingest/elastic.py | 41 ++++++++-------- .../elastic/ingest/ingest/__init__.py | 8 ++-- src/collective/elastic/ingest/mapping.py | 22 +++------ src/collective/elastic/ingest/removal.py | 6 +-- 9 files changed, 66 insertions(+), 81 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index c98cf46..5c013dc 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -15,10 +15,10 @@ Changelog [jensens] - Add docker-compose file to start OpensSearch to example directory and move `.env` to example too. [jensens] -- rename `ELASTIC_*` environemnt variables to have an consistent naming scheme, see README for details. [jensens] +- rename `ELASTIC_*` environment variables to have an consistent naming scheme, see README for details. [jensens] - Add tox, Github Actions, CI and CD. [jensens] - Refactor field-map loading to not happen on startup. [jensens] - +- Remove Support for OpensSearch 1.x and ElasticSearch < 8 [jensens] 1.4 (2023-08-17) diff --git a/README.rst b/README.rst index 45b83e9..4192519 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,7 @@ collective.elastic.ingest ========================= -Ingestion service queue runner between Plone RestAPI and ElasticSearch or OpenSearch. +Ingestion service queue runner between Plone RestAPI and ElasticSearch 8+ or OpenSearch 2+. Provides Celery-tasks to asynchronous index Plone content. - auto-create Open-/ElasticSearch... @@ -29,7 +29,7 @@ Install ``collective.elastic.ingest`` ready to use with redis and opensearch:: Depending on the queue server and index server used, the extra requirements vary: -- index server: ``opensearch``, ``elasticsearch7``, or ``elasticsearch8``. +- index server: ``opensearch``, ``elasticsearch``. - queue server: ``redis`` or ``rabbitmq``. diff --git a/pyproject.toml b/pyproject.toml index 6d34897..45c7324 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ Issues = "https://github.com/collective/collective.elastic.ingest/issues" [project.optional-dependencies] redis = ["celery[redis]"] rabbitmq = ["celery[librabbitmq]"] -opensearch = ["opensearch-py"] +opensearch = ["opensearch-py>=2"] elasticsearch = ["elasticsearch>=8.0"] test = [ "pytest", diff --git a/src/collective/elastic/ingest/__init__.py b/src/collective/elastic/ingest/__init__.py index 4b8d9c9..c82dcc3 100644 --- a/src/collective/elastic/ingest/__init__.py +++ b/src/collective/elastic/ingest/__init__.py @@ -7,9 +7,13 @@ if OPENSEARCH: version_opensearchpy = version("opensearch-py") - OPENSEARCH_2 = int(version_opensearchpy[0]) <= 2 - ELASTICSEARCH_7 = True + if int(version_opensearchpy[0]) < 2: + raise ValueError( + "opensearch-py 1.x is not supported, use version 1.x of the collective.elastic.ingest package." + ) else: version_elasticsearch = version("elasticsearch") - ELASTICSEARCH_7 = int(version_elasticsearch[0]) <= 7 - OPENSEARCH_2 = False + if int(version_elasticsearch[0]) < 7: + raise ValueError( + "elasticsearch < 7 is not supported, use Version 1.x of the collective.elastic.ingest package." + ) diff --git a/src/collective/elastic/ingest/analysis.py b/src/collective/elastic/ingest/analysis.py index 9473222..273404f 100644 --- a/src/collective/elastic/ingest/analysis.py +++ b/src/collective/elastic/ingest/analysis.py @@ -1,8 +1,6 @@ from .elastic import get_ingest_client from .logging import logger -from collective.elastic.ingest import ELASTICSEARCH_7 from collective.elastic.ingest import OPENSEARCH -from collective.elastic.ingest import OPENSEARCH_2 import json import os @@ -33,28 +31,24 @@ def update_analysis(index_name): Mapping can use analyzers from analysis.json. """ - if ANALYSISMAP: - analysis_settings = ANALYSISMAP.get("settings", {}) - if analysis_settings: - es = get_ingest_client() - if es is None: - logger.warning("No ElasticSearch client available.") - return - if ELASTICSEARCH_7: - index_exists = es.indices.exists(index_name) - else: - index_exists = es.indices.exists(index=index_name) - if index_exists: - return - - logger.info( - f"Create index '{index_name}' with analysis settings " - f"from '{_analysis_file}', but without mapping." - ) - if not OPENSEARCH and ELASTICSEARCH_7 or OPENSEARCH and OPENSEARCH_2: - es.indices.create(index_name, body=ANALYSISMAP) - else: - es.indices.create(index=index_name, settings=analysis_settings) - return - - logger.info("No analyzer configuration found.") + if not ANALYSISMAP: + logger.info("No analyzer configuration given.") + return + analysis_settings = ANALYSISMAP.get("settings", {}) + if not analysis_settings: + logger.warning("No analyzer settings found in configuration.") + return + es = get_ingest_client() + if es is None: + logger.warning("No ElasticSearch client available.") + return + if es.indices.exists(index_name): + logger.debug( + f"Analysis for index '{index_name}' already exists, skip creation." + ) + return + logger.info( + f"Create index '{index_name}' with analysis settings " + f"from '{_analysis_file}', but without mapping." + ) + es.indices.create(index_name, body=ANALYSISMAP) diff --git a/src/collective/elastic/ingest/elastic.py b/src/collective/elastic/ingest/elastic.py index 07a84cd..a3fc0eb 100644 --- a/src/collective/elastic/ingest/elastic.py +++ b/src/collective/elastic/ingest/elastic.py @@ -1,4 +1,3 @@ -from . import ELASTICSEARCH_7 from . import OPENSEARCH from .logging import logger @@ -11,27 +10,27 @@ from elasticsearch import Elasticsearch -def get_ingest_client(elasticsearch_server_baseurl=None): - """return elasticsearch client for.ingest""" +def get_search_client(elasticsearch_server_baseurl: str = ""): + """search client for query or ingest""" - raw_addr = elasticsearch_server_baseurl or os.environ.get( - "INGEST_SERVER", "http://localhost:9200" - ) - use_ssl = os.environ.get("INGEST_USE_SSL", "0") - use_ssl = bool(int(use_ssl)) + raw_addr = elasticsearch_server_baseurl or os.environ.get("INGEST_SERVER", "") + use_ssl = bool(int(os.environ.get("INGEST_USE_SSL", "0"))) addresses = [x for x in raw_addr.split(",") if x.strip()] if not addresses: addresses.append("127.0.0.1:9200") + # TODO: more auth options (cert, bearer token, api-key, etc) + auth = ( + os.environ.get("INGEST_LOGIN", "admin"), + os.environ.get("INGEST_PASSWORD", "admin"), + ) + if OPENSEARCH: + logger.info(f"Use OpenSearch client at {addresses}") hosts = [] for address in addresses: host, port = address.rsplit(":", 1) hosts.append({"host": host, "port": port}) - auth = ( - os.environ.get("INGEST_LOGIN", "admin"), - os.environ.get("INGEST_PASSWORD", "admin"), - ) client = OpenSearch( hosts=hosts, http_auth=auth, @@ -40,13 +39,17 @@ def get_ingest_client(elasticsearch_server_baseurl=None): ) info = client.info() logger.info(f"OpenSearch client info: {info}") - return client - elif ELASTICSEARCH_7: - from . import version_elasticsearch - - logger.info(f"ElasticSearch version {version_elasticsearch} installed") - return Elasticsearch( + else: + logger.info(f"Use ElasticSearch client at {addresses}") + client = Elasticsearch( addresses, use_ssl=use_ssl, + basic_auth=auth, + verify_certs=False, ) - return Elasticsearch(addresses) + return client + + +def get_ingest_client(elasticsearch_server_baseurl=None): + logger.warn("get_ingest_client is deprecated, use get_search_client instead") + return get_search_client(elasticsearch_server_baseurl) diff --git a/src/collective/elastic/ingest/ingest/__init__.py b/src/collective/elastic/ingest/ingest/__init__.py index 5cfc996..8988384 100644 --- a/src/collective/elastic/ingest/ingest/__init__.py +++ b/src/collective/elastic/ingest/ingest/__init__.py @@ -1,3 +1,4 @@ +from .. import OPENSEARCH from ..analysis import update_analysis from ..elastic import get_ingest_client from ..logging import logger @@ -13,9 +14,6 @@ from .section import enrichWithSection from .security import enrichWithSecurityInfo from .vocabularyfields import stripVocabularyTermTitles -from collective.elastic.ingest import ELASTICSEARCH_7 -from collective.elastic.ingest import OPENSEARCH -from collective.elastic.ingest import OPENSEARCH_2 from pprint import pformat @@ -49,8 +47,8 @@ def setup_ingest_pipelines(full_schema, index_name): if pipelines["processors"]: logger.info(f"update ingest pipelines {pipeline_name}") logger.debug(f"pipeline definitions:\n{pipelines}") - if (not OPENSEARCH and ELASTICSEARCH_7) or (OPENSEARCH and OPENSEARCH_2): - es.ingest.put_pipeline(pipeline_name, pipelines) + if OPENSEARCH: + es.ingest.put_pipeline(id=pipeline_name, body=pipelines) else: es.ingest.put_pipeline(id=pipeline_name, processors=pipelines["processors"]) else: diff --git a/src/collective/elastic/ingest/mapping.py b/src/collective/elastic/ingest/mapping.py index aff2037..556bbaa 100644 --- a/src/collective/elastic/ingest/mapping.py +++ b/src/collective/elastic/ingest/mapping.py @@ -1,4 +1,3 @@ -from . import ELASTICSEARCH_7 from .elastic import get_ingest_client from .logging import logger from copy import deepcopy @@ -135,10 +134,7 @@ def create_or_update_mapping(full_schema, index_name): return # get current mapping - if ELASTICSEARCH_7: - index_exists = es.indices.exists(index_name) - else: - index_exists = es.indices.exists(index=index_name) + index_exists = es.indices.exists(index=index_name) if index_exists: original_mapping = es.indices.get_mapping(index=index_name)[index_name] mapping = deepcopy(original_mapping) @@ -201,20 +197,14 @@ def create_or_update_mapping(full_schema, index_name): json.dumps(mapping["mappings"], sort_keys=True, indent=2) ) ) - if ELASTICSEARCH_7: - es.indices.put_mapping(index=index_name, body=mapping["mappings"]) - else: - es.indices.put_mapping( - index=[index_name], - body=mapping["mappings"], - ) + es.indices.put_mapping( + index=[index_name], + body=mapping["mappings"], + ) else: logger.debug("No update necessary. Mapping is unchanged.") else: # from celery.contrib import rdb; rdb.set_trace() logger.info("Create index with mapping.") logger.debug(f"mapping is:\n{json.dumps(mapping, sort_keys=True, indent=2)}") - if ELASTICSEARCH_7: - es.indices.create(index_name, body=mapping) - else: - es.indices.create(index=index_name, mappings=mapping["mappings"]) + es.indices.create(index=index_name, mappings=mapping["mappings"]) diff --git a/src/collective/elastic/ingest/removal.py b/src/collective/elastic/ingest/removal.py index 986bcf0..41e87c8 100644 --- a/src/collective/elastic/ingest/removal.py +++ b/src/collective/elastic/ingest/removal.py @@ -1,9 +1,5 @@ from .elastic import get_ingest_client - -import logging - - -logger = logging.getLogger(__name__) +from .logging import logger def remove(uid, index_name):