diff --git a/.drone.jsonnet b/.drone.jsonnet index 0e10ea2282..9990702394 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -176,7 +176,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", publish(step_prefix="pkg", eventp=event + "/${DRONE_BUILD_NUMBER}"):: { name: "publish " + step_prefix, depends_on: [std.strReplace(step_prefix, " latest", ""), "createrepo"], - image: "amazon/aws-cli", + image: "amazon/aws-cli:2.23.5", when: { status: ["success", "failure"], }, @@ -271,7 +271,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", smoke:: { name: "smoke", depends_on: ["publish pkg"], - image: "docker", + image: "docker:28.2.2", volumes: [pipeline._volumes.mdb, pipeline._volumes.docker], commands: [ prepareTestStage(getContainerName("smoke"), result, true), @@ -281,7 +281,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", smokelog:: { name: "smokelog", depends_on: ["smoke"], - image: "docker", + image: "docker:28.2.2", volumes: [pipeline._volumes.docker, pipeline._volumes.mdb], commands: [ reportTestStage(getContainerName("smoke"), result, "smoke"), @@ -293,7 +293,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", upgrade(version):: { name: "upgrade-test from " + version, depends_on: ["regressionlog"], - image: "docker", + image: "docker:28.2.2", volumes: [pipeline._volumes.docker], environment: { UPGRADE_TOKEN: { @@ -314,7 +314,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", upgradelog:: { name: "upgradelog", depends_on: std.map(function(p) "upgrade-test from " + p, mdb_server_versions), - image: "docker", + image: "docker:28.2.2", volumes: [pipeline._volumes.docker, pipeline._volumes.mdb], commands: ["echo"] + @@ -377,7 +377,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", mtrlog:: { name: "mtrlog", depends_on: ["mtr"], - image: "docker", + image: "docker:28.2.2", volumes: [pipeline._volumes.docker, pipeline._volumes.mdb], commands: [ reportTestStage(getContainerName("mtr"), result, "mtr"), @@ -467,7 +467,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", regressionlog:: { name: "regressionlog", depends_on: [regression_tests[std.length(regression_tests) - 1]], - image: "docker", + image: "docker:28.2.2", volumes: [pipeline._volumes.docker, pipeline._volumes.mdb], commands: [ reportTestStage(getContainerName("regression"), result, "regression"), @@ -480,7 +480,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", name: "dockerfile", depends_on: ["publish pkg", "publish cmapi build"], //failure: 'ignore', - image: "alpine/git", + image: "alpine/git:2.49.0", environment: { DOCKER_BRANCH_REF: "${DRONE_SOURCE_BRANCH}", DOCKER_REF_AUX: branch_ref, @@ -554,7 +554,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", cmapilog:: { name: "cmapilog", depends_on: ["cmapi test"], - image: "docker", + image: "docker:28.2.2", volumes: [pipeline._volumes.docker, pipeline._volumes.mdb], commands: [ reportTestStage(getContainerName("cmapi"), result, "cmapi"), @@ -567,7 +567,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", name: "mtr", depends_on: ["dockerhub"], //failure: 'ignore', - image: "docker", + image: "docker:28.2.2", volumes: [pipeline._volumes.docker], environment: { DOCKER_LOGIN: { @@ -604,7 +604,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", steps: [ { name: "submodules", - image: "alpine/git", + image: "alpine/git:2.49.0", commands: [ "git submodule update --init --recursive", "git config cmake.update-submodules no", @@ -613,7 +613,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", }, { name: "clone-mdb", - image: "alpine/git", + image: "alpine/git:2.49.0", volumes: [pipeline._volumes.mdb], environment: { SERVER_REF: "${SERVER_REF:-" + server + "}", @@ -662,7 +662,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", + get_sccache + customEnvCommands(customBuildEnvCommandsMapKey, builddir) + [ - 'bash -c "set -o pipefail && bash /mdb/' + builddir + "/storage/columnstore/columnstore/build/bootstrap_mcs.sh " + + 'bash -c "set -o pipefail && bash /mdb/' + builddir + "/storage/columnstore/columnstore/build/bootstrap_mcs.sh " + "--build-type RelWithDebInfo " + "--distro " + platform + " " + "--build-packages --install-deps --sccache " + @@ -714,7 +714,7 @@ local Pipeline(branch, platform, event, arch="amd64", server="10.6-enterprise", { name: "pkg", depends_on: ["unittests"], - image: "alpine/git", + image: "alpine/git:2.49.0", when: { status: ["success", "failure"], }, diff --git a/CMakeLists.txt b/CMakeLists.txt index 093fb1f930..ec22f91d0a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,6 +47,7 @@ include(columnstore_version) include(configureEngine) include(compiler_flags) include(misc) +include(cpack_manage) set(COMPONENTS dbcon/mysql diff --git a/build/bootstrap_mcs.sh b/build/bootstrap_mcs.sh index dff4aea4d9..8fe38dd367 100755 --- a/build/bootstrap_mcs.sh +++ b/build/bootstrap_mcs.sh @@ -742,8 +742,7 @@ if [[ $BUILD_PACKAGES = true ]]; then modify_packaging build_package message_splitted "PACKAGES BUILD FINISHED" - - return 0 + exit 0 fi stop_service diff --git a/cmake/cpack_manage.cmake b/cmake/cpack_manage.cmake new file mode 100644 index 0000000000..e463db9cfb --- /dev/null +++ b/cmake/cpack_manage.cmake @@ -0,0 +1,33 @@ +macro(columnstore_append_for_cpack var_name) + # Get current value from parent scope or use empty string + if(DEFINED ${var_name}) + set(current_val "${${var_name}}") + else() + set(current_val "") + endif() + + # Process each argument to append + foreach(arg IN LISTS ARGN) + if(current_val) + # If not empty, add comma before new item + set(current_val "${current_val}, ${arg}") + else() + # If empty, just add the item + set(current_val "${arg}") + endif() + endforeach() + + # Set back in parent scope + set(${var_name} + "${current_val}" + PARENT_SCOPE + ) +endmacro() + +macro(columnstore_add_rpm_deps) + columnstore_append_for_cpack(CPACK_RPM_columnstore-engine_PACKAGE_REQUIRES ${ARGN}) +endmacro() + +if(RPM) + columnstore_add_rpm_deps("snappy" "jemalloc" "procps-ng" "gawk") +endif() diff --git a/cmapi/cmapi_server/constants.py b/cmapi/cmapi_server/constants.py index 464b61d99a..800317bb5a 100644 --- a/cmapi/cmapi_server/constants.py +++ b/cmapi/cmapi_server/constants.py @@ -4,6 +4,7 @@ """ import os from typing import NamedTuple +from enum import Enum # default MARIADB ColumnStore config path @@ -53,6 +54,16 @@ CMAPI_INSTALL_PATH, 'cmapi_server/SingleNode.xml' ) +class MCSProgs(Enum): + STORAGE_MANAGER = 'StorageManager' + WORKER_NODE = 'workernode' + CONTROLLER_NODE = 'controllernode' + PRIM_PROC = 'PrimProc' + # EXE_MGR = 'ExeMgr' + WRITE_ENGINE_SERVER = 'WriteEngineServer' + DML_PROC = 'DMLProc' + DDL_PROC = 'DDLProc' + # constants for dispatchers class ProgInfo(NamedTuple): """NamedTuple for some additional info about handling mcs processes.""" @@ -66,17 +77,17 @@ class ProgInfo(NamedTuple): # on top level of process handling # mcs-storagemanager starts conditionally inside mcs-loadbrm, but should be # stopped using cmapi -ALL_MCS_PROGS = { +ALL_MCS_PROGS: dict[str, ProgInfo] = { # workernode starts on primary and non primary node with 1 or 2 added # to subcommand (DBRM_Worker1 - on primary, DBRM_Worker2 - non primary) - 'StorageManager': ProgInfo(15, 'mcs-storagemanager', '', False, 1), - 'workernode': ProgInfo(13, 'mcs-workernode', 'DBRM_Worker{}', False, 1), - 'controllernode': ProgInfo(11, 'mcs-controllernode', 'fg', True), - 'PrimProc': ProgInfo(5, 'mcs-primproc', '', False, 1), - 'ExeMgr': ProgInfo(9, 'mcs-exemgr', '', False, 1), - 'WriteEngineServer': ProgInfo(7, 'mcs-writeengineserver', '', False, 3), - 'DMLProc': ProgInfo(3, 'mcs-dmlproc', '', False), - 'DDLProc': ProgInfo(1, 'mcs-ddlproc', '', False), + MCSProgs.STORAGE_MANAGER.value: ProgInfo(15, 'mcs-storagemanager', '', False, 1), + MCSProgs.WORKER_NODE.value: ProgInfo(13, 'mcs-workernode', 'DBRM_Worker{}', False, 1), + MCSProgs.CONTROLLER_NODE.value: ProgInfo(11, 'mcs-controllernode', 'fg', True), + MCSProgs.PRIM_PROC.value: ProgInfo(5, 'mcs-primproc', '', False, 1), + # MCSProgs.EXE_MGR.value: ProgInfo(9, 'mcs-exemgr', '', False, 1), + MCSProgs.WRITE_ENGINE_SERVER.value: ProgInfo(7, 'mcs-writeengineserver', '', False, 3), + MCSProgs.DML_PROC.value: ProgInfo(3, 'mcs-dmlproc', '', False), + MCSProgs.DDL_PROC.value: ProgInfo(1, 'mcs-ddlproc', '', False), } # constants for docker container dispatcher diff --git a/cmapi/cmapi_server/controllers/api_clients.py b/cmapi/cmapi_server/controllers/api_clients.py index 7b7e696220..d7ea2cf80a 100644 --- a/cmapi/cmapi_server/controllers/api_clients.py +++ b/cmapi/cmapi_server/controllers/api_clients.py @@ -68,6 +68,7 @@ def add_node( :param node_info: Information about the node to add. :return: The response from the API. """ + #TODO: fix interface as in remove_node used or think about universal return self._request('PUT', 'node', {**node_info, **extra}) def remove_node( diff --git a/cmapi/cmapi_server/controllers/endpoints.py b/cmapi/cmapi_server/controllers/endpoints.py index 870e07b350..48443957f6 100644 --- a/cmapi/cmapi_server/controllers/endpoints.py +++ b/cmapi/cmapi_server/controllers/endpoints.py @@ -434,7 +434,8 @@ def put_config(self): MCSProcessManager.stop_node( is_primary=node_config.is_primary_node(), use_sudo=use_sudo, - timeout=request_timeout + timeout=request_timeout, + is_read_only=node_config.is_read_only(), ) except CMAPIBasicError as err: raise_422_error( @@ -463,6 +464,7 @@ def put_config(self): MCSProcessManager.start_node( is_primary=node_config.is_primary_node(), use_sudo=use_sudo, + is_read_only=node_config.is_read_only(), ) except CMAPIBasicError as err: raise_422_error( @@ -666,7 +668,8 @@ def put_start(self): try: MCSProcessManager.start_node( is_primary=node_config.is_primary_node(), - use_sudo=use_sudo + use_sudo=use_sudo, + is_read_only=node_config.is_read_only(), ) except CMAPIBasicError as err: raise_422_error( @@ -701,7 +704,8 @@ def put_shutdown(self): MCSProcessManager.stop_node( is_primary=node_config.is_primary_node(), use_sudo=use_sudo, - timeout=timeout + timeout=timeout, + is_read_only=node_config.is_read_only(), ) except CMAPIBasicError as err: raise_422_error( @@ -744,6 +748,9 @@ def get_brm_bytes(self, element:str): ret = subprocess.run(args, stdout=subprocess.PIPE) if ret.returncode != 0: module_logger.warning(f"{func_name} got error code {ret.returncode} from smcat, retrying") + stderr = ret.stderr + module_logger.warning(f"{func_name} smcat stderr: {stderr}") + module_logger.warning(f"{func_name} smcat: {args}") time.sleep(1) retry_count += 1 continue @@ -763,6 +770,8 @@ def get_brm_bytes(self, element:str): ret = subprocess.run(args, stdout=subprocess.PIPE) if ret.returncode != 0: module_logger.warning(f"{func_name} got error code {ret.returncode} from smcat, retrying") + stderr = ret.stderr + module_logger.warning(f"{func_name} smcat stderr: {stderr}") time.sleep(1) retry_count += 1 continue @@ -910,6 +919,7 @@ def put_add_node(self): node = request_body.get('node', None) config = request_body.get('config', DEFAULT_MCS_CONF_PATH) in_transaction = request_body.get('in_transaction', False) + read_only = request_body.get('read_only', False) if node is None: raise_422_error(module_logger, func_name, 'missing node argument') @@ -917,9 +927,9 @@ def put_add_node(self): try: if not in_transaction: with TransactionManager(extra_nodes=[node]): - response = ClusterHandler.add_node(node, config) + response = ClusterHandler.add_node(node, config, read_only) else: - response = ClusterHandler.add_node(node, config) + response = ClusterHandler.add_node(node, config, read_only) except CMAPIBasicError as err: raise_422_error(module_logger, func_name, err.message) diff --git a/cmapi/cmapi_server/failover_agent.py b/cmapi/cmapi_server/failover_agent.py index 864715e090..1e4f5fdac7 100644 --- a/cmapi/cmapi_server/failover_agent.py +++ b/cmapi/cmapi_server/failover_agent.py @@ -95,7 +95,10 @@ def enterStandbyMode(self, test_mode = False): try: # TODO: remove test_mode condition and add mock for testing if not test_mode: - MCSProcessManager.stop_node(is_primary=nc.is_primary_node()) + MCSProcessManager.stop_node( + is_primary=nc.is_primary_node(), + is_read_only=nc.is_read_only(), + ) logger.info( 'FA.enterStandbyMode(): successfully stopped node.' ) diff --git a/cmapi/cmapi_server/handlers/cluster.py b/cmapi/cmapi_server/handlers/cluster.py index 10a213fb70..1cc9a1d6ff 100644 --- a/cmapi/cmapi_server/handlers/cluster.py +++ b/cmapi/cmapi_server/handlers/cluster.py @@ -139,7 +139,10 @@ def shutdown( return {'timestamp': operation_start_time} @staticmethod - def add_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict: + def add_node( + node: str, config: str = DEFAULT_MCS_CONF_PATH, + read_only: bool = False, + ) -> dict: """Method to add node to MCS CLuster. :param node: node IP or name or FQDN @@ -147,6 +150,8 @@ def add_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict: :param config: columnstore xml config file path, defaults to DEFAULT_MCS_CONF_PATH :type config: str, optional + :param read_only: add node in read-only mode, defaults to False + :type read_only: bool, optional :raises CMAPIBasicError: on exception while starting transaction :raises CMAPIBasicError: if transaction start isn't successful :raises CMAPIBasicError: on exception while adding node @@ -157,20 +162,30 @@ def add_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict: :rtype: dict """ logger: logging.Logger = logging.getLogger('cmapi_server') - logger.debug(f'Cluster add node command called. Adding node {node}.') + logger.debug( + f'Cluster add node command called. Adding node {node} in ' + f'{"read-only" if read_only else "read-write"} mode.' + ) response = {'timestamp': str(datetime.now())} try: add_node( node, input_config_filename=config, - output_config_filename=config + output_config_filename=config, + read_only=read_only, ) if not get_dbroots(node, config): - add_dbroot( - host=node, input_config_filename=config, - output_config_filename=config - ) + if not read_only: # Read-only nodes don't own dbroots + add_dbroot( + host=node, input_config_filename=config, + output_config_filename=config + ) + else: + logger.debug( + f'Node {node} is read-only, skipping dbroot addition' + ) + except Exception as err: raise CMAPIBasicError('Error while adding node.') from err diff --git a/cmapi/cmapi_server/helpers.py b/cmapi/cmapi_server/helpers.py index 53fb003e78..e022d35033 100644 --- a/cmapi/cmapi_server/helpers.py +++ b/cmapi/cmapi_server/helpers.py @@ -11,7 +11,6 @@ import socket import time from collections import namedtuple -from functools import partial from random import random from shutil import copyfile from typing import Tuple, Optional @@ -541,6 +540,11 @@ def get_desired_nodes(config=DEFAULT_MCS_CONF_PATH): return [ node.text for node in nodes ] +def get_read_only_nodes(root) -> list[str]: + """Get names of read only nodes from config""" + return [node.text for node in root.findall('./ReadOnlyNodes/Node')] + + def in_maintenance_state(config=DEFAULT_MCS_CONF_PATH): nc = NodeConfig() root = nc.get_current_config_root(config, upgrade=False) @@ -577,6 +581,7 @@ def get_dbroots(node, config=DEFAULT_MCS_CONF_PATH): dbroots = [] smc_node = root.find('./SystemModuleConfig') mod_count = int(smc_node.find('./ModuleCount3').text) + for i in range(1, mod_count+1): ip_addr = smc_node.find(f'./ModuleIPAddr{i}-1-3').text hostname = smc_node.find(f'./ModuleHostName{i}-1-3').text @@ -596,6 +601,7 @@ def get_dbroots(node, config=DEFAULT_MCS_CONF_PATH): dbroots.append( smc_node.find(f"./ModuleDBRootID{i}-{j}-3").text ) + return dbroots diff --git a/cmapi/cmapi_server/managers/process.py b/cmapi/cmapi_server/managers/process.py index 1adb62e8d6..9b0d2264d0 100644 --- a/cmapi/cmapi_server/managers/process.py +++ b/cmapi/cmapi_server/managers/process.py @@ -7,7 +7,8 @@ import psutil from cmapi_server.exceptions import CMAPIBasicError -from cmapi_server.constants import MCS_INSTALL_BIN, ALL_MCS_PROGS +from cmapi_server.constants import MCS_INSTALL_BIN, ALL_MCS_PROGS, MCSProgs +from cmapi_server.process_dispatchers.base import BaseDispatcher from cmapi_server.process_dispatchers.systemd import SystemdDispatcher from cmapi_server.process_dispatchers.container import ( ContainerDispatcher @@ -18,7 +19,7 @@ from mcs_node_control.models.process import Process -PROCESS_DISPATCHERS = { +PROCESS_DISPATCHERS: dict[str, type[BaseDispatcher]] = { 'systemd': SystemdDispatcher, # could be used in docker containers and OSes w/o systemd 'container': ContainerDispatcher, @@ -404,19 +405,26 @@ def is_node_processes_ok( return set(node_progs) == set(p['name'] for p in running_procs) @classmethod - def start_node(cls, is_primary: bool, use_sudo: bool = True): + def start_node( + cls, + is_primary: bool, + use_sudo: bool = True, + is_read_only: bool = False, + ) -> None: """Start mcs node processes. :param is_primary: is node primary or not, defaults to True :type is_primary: bool :param use_sudo: use sudo or not, defaults to True :type use_sudo: bool, optional + :param is_read_only: if true, doesn't start WriteEngine + :type is_read_only: bool, optional :raises CMAPIBasicError: immediately if one mcs process not started """ for prog_name in cls._get_sorted_progs(is_primary): if ( cls.dispatcher_name == 'systemd' - and prog_name == 'StorageManager' + and prog_name == MCSProgs.STORAGE_MANAGER.value ): # TODO: MCOL-5458 logging.info( @@ -424,17 +432,27 @@ def start_node(cls, is_primary: bool, use_sudo: bool = True): ) continue # TODO: additional error handling - if prog_name == 'controllernode': + if prog_name == MCSProgs.CONTROLLER_NODE.value: cls._wait_for_workernodes() - if prog_name in ('DMLProc', 'DDLProc'): + if prog_name in (MCSProgs.DML_PROC.value, MCSProgs.DDL_PROC.value): cls._wait_for_controllernode() + if ( + is_read_only and + prog_name == MCSProgs.WRITE_ENGINE_SERVER.value + ): + logging.debug('Node is in read-only mode, not starting WriteEngine') + continue if not cls.start(prog_name, is_primary, use_sudo): logging.error(f'Process "{prog_name}" not started properly.') raise CMAPIBasicError(f'Error while starting "{prog_name}".') @classmethod def stop_node( - cls, is_primary: bool, use_sudo: bool = True, timeout: int = 10 + cls, + is_primary: bool, + use_sudo: bool = True, + timeout: int = 10, + is_read_only: bool = False, ): """Stop mcs node processes. @@ -444,6 +462,8 @@ def stop_node( :type use_sudo: bool, optional :param timeout: timeout for DMLProc gracefully stop using DBRM, seconds :type timeout: int + :param is_read_only: if true, doesn't stop WriteEngine + :type is_read_only: bool, optional :raises CMAPIBasicError: immediately if one mcs process not stopped """ # Every time try to stop all processes no matter primary it or slave, @@ -456,8 +476,8 @@ def stop_node( raise CMAPIBasicError(f'Error while stopping "{prog_name}"') @classmethod - def restart_node(cls, is_primary: bool, use_sudo: bool): + def restart_node(cls, is_primary: bool, use_sudo: bool, is_read_only: bool = False): """TODO: For next releases.""" if cls.get_running_mcs_procs(): - cls.stop_node(is_primary, use_sudo) - cls.start_node(is_primary, use_sudo) + cls.stop_node(is_primary, use_sudo, is_read_only) + cls.start_node(is_primary, use_sudo, is_read_only) diff --git a/cmapi/cmapi_server/managers/transaction.py b/cmapi/cmapi_server/managers/transaction.py index 68bb7bc77c..98de505241 100644 --- a/cmapi/cmapi_server/managers/transaction.py +++ b/cmapi/cmapi_server/managers/transaction.py @@ -106,10 +106,10 @@ def rollback_transaction(self, nodes: Optional[list] = None) -> None: try: rollback_transaction(self.txn_id, nodes=nodes) self.active_transaction = False - logging.debug(f'Success rollback of transaction "{self.txn_id}".') + logging.debug(f'Successfull rollback of transaction "{self.txn_id}".') except Exception: logging.error( - f'Error while rollback transaction "{self.txn_id}"', + f'Error while rolling back transaction "{self.txn_id}"', exc_info=True ) diff --git a/cmapi/cmapi_server/node_manipulation.py b/cmapi/cmapi_server/node_manipulation.py index 7eee0ad18e..4c2e2ff1db 100644 --- a/cmapi/cmapi_server/node_manipulation.py +++ b/cmapi/cmapi_server/node_manipulation.py @@ -61,7 +61,8 @@ def switch_node_maintenance( def add_node( node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH, output_config_filename: Optional[str] = None, - use_rebalance_dbroots: bool = True + use_rebalance_dbroots: bool = True, + read_only: bool = False, ): """Add node to a cluster. @@ -95,14 +96,23 @@ def add_node( try: if not _replace_localhost(c_root, node): pm_num = _add_node_to_PMS(c_root, node) - _add_WES(c_root, pm_num, node) + + if not read_only: + _add_WES(c_root, pm_num, node) + else: + logging.info('Node is read-only, skipping WES addition.') + _add_read_only_node(c_root, node) + _add_DBRM_Worker(c_root, node) _add_Module_entries(c_root, node) _add_active_node(c_root, node) _add_node_to_ExeMgrs(c_root, node) if use_rebalance_dbroots: - _rebalance_dbroots(c_root) - _move_primary_node(c_root) + if not read_only: + _rebalance_dbroots(c_root) + _move_primary_node(c_root) + else: + add_dbroots_of_other_nodes(c_root, pm_num) except Exception: logging.error( 'Caught exception while adding node, config file is unchanged', @@ -156,7 +166,11 @@ def remove_node( if len(active_nodes) > 1: pm_num = _remove_node_from_PMS(c_root, node) - _remove_WES(c_root, pm_num) + + is_read_only = node in helpers.get_read_only_nodes(c_root) + if not is_read_only: + _remove_WES(c_root, pm_num) + _remove_DBRM_Worker(c_root, node) _remove_Module_entries(c_root, node) _remove_from_ExeMgrs(c_root, node) @@ -167,9 +181,12 @@ def remove_node( # TODO: unspecific name, need to think of a better one _remove_node(c_root, node) - if use_rebalance_dbroots: + if use_rebalance_dbroots and not is_read_only: _rebalance_dbroots(c_root) _move_primary_node(c_root) + + if is_read_only: + remove_dbroots_of_node(c_root, pm_num) else: # TODO: # - IMO undefined behaviour here. Removing one single node @@ -375,12 +392,16 @@ def __remove_helper(parent_node, node): def _remove_node(root, node): ''' - remove node from DesiredNodes, InactiveNodes, and ActiveNodes + remove node from DesiredNodes, InactiveNodes, ActiveNodes and (if present) ReadOnlyNodes ''' for n in (root.find("./DesiredNodes"), root.find("./InactiveNodes"), root.find("./ActiveNodes")): __remove_helper(n, node) + read_only_nodes = root.find('./ReadOnlyNodes') + if read_only_nodes is not None: + __remove_helper(read_only_nodes, node) + # This moves a node from ActiveNodes to InactiveNodes def _deactivate_node(root, node): @@ -528,6 +549,19 @@ def unassign_dbroot1(root): i += 1 +def _get_existing_db_roots(root: etree.Element) -> list[int]: + '''Get all the existing dbroot IDs from the config file''' + # There can be holes in the dbroot numbering, so can't just scan from [1-dbroot_count] + # Going to scan from 1-99 instead + sysconf_node = root.find("./SystemConfig") + existing_dbroots = [] + for num in range(1, 100): + node = sysconf_node.find(f"./DBRoot{num}") + if node is not None: + existing_dbroots.append(num) + return existing_dbroots + + def _rebalance_dbroots(root, test_mode=False): # TODO: add code to detect whether we are using shared storage or not. If not, exit # without doing anything. @@ -571,14 +605,7 @@ def _rebalance_dbroots(root, test_mode=False): current_mapping = get_current_dbroot_mapping(root) sysconf_node = root.find("./SystemConfig") - - # There can be holes in the dbroot numbering, so can't just scan from [1-dbroot_count] - # Going to scan from 1-99 instead. - existing_dbroots = [] - for num in range(1, 100): - node = sysconf_node.find(f"./DBRoot{num}") - if node is not None: - existing_dbroots.append(num) + existing_dbroots = _get_existing_db_roots(root) # assign the unassigned dbroots unassigned_dbroots = set(existing_dbroots) - set(current_mapping[0]) @@ -630,7 +657,7 @@ def _rebalance_dbroots(root, test_mode=False): # timed out # possible node is not ready, leave retry as-is pass - except Exception as e: + except Exception: retry = False if not found_master: @@ -966,7 +993,7 @@ def _add_Module_entries(root, node): logging.info(f"_add_Module_entries(): hostname doesn't match, updating address to {new_ip_addr}") smc_node.find(f"ModuleHostName{i}-1-3").text = new_ip_addr else: - logging.info(f"_add_Module_entries(): no update is necessary") + logging.info("_add_Module_entries(): no update is necessary") return # if we find a matching hostname, update the ip addr @@ -988,6 +1015,22 @@ def _add_WES(root, pm_num, node): etree.SubElement(wes_node, "Port").text = "8630" +def _add_read_only_node(root: etree.Element, node: str) -> None: + '''Add node name to ReadOnlyNodes if it is not already there''' + read_only_nodes = root.find('./ReadOnlyNodes') + if read_only_nodes is None: + read_only_nodes = etree.SubElement(root, 'ReadOnlyNodes') + else: + for n in read_only_nodes.findall("./Node"): + if n.text == node: + logging.warning( + f"_add_read_only_node(): node {node} already exists in ReadOnlyNodes" + ) + return + + etree.SubElement(read_only_nodes, "Node").text = node + + def _add_DBRM_Worker(root, node): ''' find the highest numbered DBRM_Worker entry, or one that isn't used atm @@ -1090,7 +1133,7 @@ def _add_node_to_PMS(root, node): return new_pm_num -def _replace_localhost(root, node): +def _replace_localhost(root: etree.Element, node: str) -> bool: # if DBRM_Controller/IPAddr is 127.0.0.1 or localhost, # then replace all instances, else do nothing. controller_host = root.find('./DBRM_Controller/IPAddr') @@ -1138,3 +1181,51 @@ def _replace_localhost(root, node): # New Exception types class NodeNotFoundException(Exception): pass + + +def add_dbroots_of_other_nodes(root: etree.Element, module_num: int) -> None: + """Adds all the dbroots listed in the config to this (read-only) node""" + existing_dbroots = _get_existing_db_roots(root) + sysconf_node = root.find("./SystemModuleConfig") + + # Write node's dbroot count + dbroot_count_node = sysconf_node.find(f"./ModuleDBRootCount{module_num}-3") + if dbroot_count_node is not None: + sysconf_node.remove(dbroot_count_node) + dbroot_count_node = etree.SubElement( + sysconf_node, f"ModuleDBRootCount{module_num}-3" + ) + dbroot_count_node.text = str(len(existing_dbroots)) + + # Remove existing dbroot IDs of this module if present + for i in range(1, 100): + dbroot_id_node = sysconf_node.find(f"./ModuleDBRootID{module_num}-{i}-3") + if dbroot_id_node is not None: + sysconf_node.remove(dbroot_id_node) + + # Write new dbroot IDs to the module mapping + for i, dbroot_id in enumerate(existing_dbroots, start=1): + dbroot_id_node = etree.SubElement( + sysconf_node, f"ModuleDBRootID{module_num}-{i}-3" + ) + dbroot_id_node.text = str(dbroot_id) + + logging.info("Added %d dbroots to read-only node %d: %s", len(existing_dbroots), module_num, sorted(existing_dbroots)) + + +def remove_dbroots_of_node(root: etree.Element, module_num: int) -> None: + """Removes all the dbroots listed in the config from this (read-only) node""" + sysconf_node = root.find("./SystemModuleConfig") + dbroot_count_node = sysconf_node.find(f"./ModuleDBRootCount{module_num}-3") + if dbroot_count_node is not None: + sysconf_node.remove(dbroot_count_node) + else: + logging.error( + f"ModuleDBRootCount{module_num}-3 not found in SystemModuleConfig" + ) + + # Remove existing dbroot IDs + for i in range(1, 100): + dbroot_id_node = sysconf_node.find(f"./ModuleDBRootID{module_num}-{i}-3") + if dbroot_id_node is not None: + sysconf_node.remove(dbroot_id_node) \ No newline at end of file diff --git a/cmapi/cmapi_server/test/test_cluster.py b/cmapi/cmapi_server/test/test_cluster.py index 5887a8fe1c..833def4851 100644 --- a/cmapi/cmapi_server/test/test_cluster.py +++ b/cmapi/cmapi_server/test/test_cluster.py @@ -6,6 +6,7 @@ import requests +from cmapi_server.constants import MCSProgs from cmapi_server.controllers.dispatcher import _version from cmapi_server.managers.process import MCSProcessManager from cmapi_server.test.unittest_global import ( @@ -199,7 +200,7 @@ def test_endpoint(self): # Check Columntore started controllernode = subprocess.check_output( - ['pgrep', 'controllernode']) + ['pgrep', MCSProgs.CONTROLLER_NODE.value]) self.assertIsNotNone(controllernode) diff --git a/cmapi/cmapi_server/test/test_node_manip.py b/cmapi/cmapi_server/test/test_node_manip.py index 22a35c64a7..195c286499 100644 --- a/cmapi/cmapi_server/test/test_node_manip.py +++ b/cmapi/cmapi_server/test/test_node_manip.py @@ -1,10 +1,13 @@ import logging import socket - +import unittest +from unittest.mock import patch, ANY from lxml import etree from cmapi_server import node_manipulation from cmapi_server.constants import MCS_DATA_PATH +from cmapi_server.helpers import get_read_only_nodes +from cmapi_server.node_manipulation import add_dbroots_of_other_nodes, remove_dbroots_of_node from cmapi_server.test.unittest_global import ( tmp_mcs_config_filename, BaseNodeManipTestCase ) @@ -13,6 +16,8 @@ logging.basicConfig(level='DEBUG') +SINGLE_NODE_XML = "./cmapi_server/SingleNode.xml" + class NodeManipTester(BaseNodeManipTestCase): @@ -52,6 +57,63 @@ def test_add_remove_node(self): # node = root.find('./PMS2/IPAddr') # self.assertEqual(node, None) + def test_add_remove_read_only_node(self): + """add_node(read_only=True) should add a read-only node into the config, it does not add a WriteEngineServer (WES) and does not own dbroots""" + self.tmp_files = ('./config_output_rw.xml', './config_output_ro.xml', './config_output_ro_removed.xml') + + # Add this host as a read-write node + local_host_addr = socket.gethostbyname(socket.gethostname()) + node_manipulation.add_node( + local_host_addr, SINGLE_NODE_XML, self.tmp_files[0] + ) + + # Mock _rebalance_dbroots and _move_primary_node (only after the first node is added) + with patch('cmapi_server.node_manipulation._rebalance_dbroots') as mock_rebalance_dbroots, \ + patch('cmapi_server.node_manipulation._move_primary_node') as mock_move_primary_node, \ + patch('cmapi_server.node_manipulation.add_dbroots_of_other_nodes') as mock_add_dbroots_of_other_nodes, \ + patch('cmapi_server.node_manipulation.remove_dbroots_of_node') as mock_remove_dbroots_of_node: + + # Add a read-only node + node_manipulation.add_node( + self.NEW_NODE_NAME, self.tmp_files[0], self.tmp_files[1], + read_only=True, + ) + + nc = NodeConfig() + root = nc.get_current_config_root(self.tmp_files[1]) + + # Check if read-only nodes section exists and is filled + read_only_nodes = get_read_only_nodes(root) + self.assertEqual(len(read_only_nodes), 1) + self.assertEqual(read_only_nodes[0], self.NEW_NODE_NAME) + + # Check if PMS was added + pms_node_ipaddr = root.find('./PMS2/IPAddr') + self.assertEqual(pms_node_ipaddr.text, self.NEW_NODE_NAME) + + # Check that WriteEngineServer was not added + wes_node = root.find('./pm2_WriteEngineServer') + self.assertIsNone(wes_node) + + mock_rebalance_dbroots.assert_not_called() + mock_move_primary_node.assert_not_called() + mock_add_dbroots_of_other_nodes.assert_called_once_with(ANY, 2) + + # Test read-only node removal + node_manipulation.remove_node( + self.NEW_NODE_NAME, self.tmp_files[1], self.tmp_files[2], + ) + + nc = NodeConfig() + root = nc.get_current_config_root(self.tmp_files[2]) + read_only_nodes = get_read_only_nodes(root) + self.assertEqual(len(read_only_nodes), 0) + + mock_rebalance_dbroots.assert_not_called() + mock_move_primary_node.assert_not_called() + mock_remove_dbroots_of_node.assert_called_once_with(ANY, 2) + + def test_add_dbroots_nodes_rebalance(self): self.tmp_files = ( './extra-dbroots-0.xml', './extra-dbroots-1.xml', @@ -209,3 +271,59 @@ def test_unassign_dbroot1(self): caught_it = True self.assertTrue(caught_it) + + +class TestReadOnlyNodeDBRootsManip(unittest.TestCase): + our_module_idx = 2 + + def setUp(self): + # Mock initial XML structure (add two dbroots) + self.root = etree.Element('Columnstore') + etree.SubElement(self.root, 'SystemModuleConfig') + system_config = etree.SubElement(self.root, 'SystemConfig') + dbroot_count = etree.SubElement(system_config, 'DBRootCount') + dbroot_count.text = '2' + dbroot1 = etree.SubElement(system_config, 'DBRoot1') + dbroot1.text = '/data/dbroot1' + dbroot2 = etree.SubElement(system_config, 'DBRoot2') + dbroot2.text = '/data/dbroot2' + + def test_add_dbroots_of_other_nodes(self): + '''add_dbroots_of_other_nodes must add dbroots of other nodes into mapping of the node.''' + add_dbroots_of_other_nodes(self.root, self.our_module_idx) + + # Check that ModuleDBRootCount of the module was updated + module_count = self.root.find(f'./SystemModuleConfig/ModuleDBRootCount{self.our_module_idx}-3') + self.assertIsNotNone(module_count) + self.assertEqual(module_count.text, '2') + + # Check that dbroots were added to ModuleDBRootID{module_num}-{i}-3 + dbroot1 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-1-3') + dbroot2 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-2-3') + self.assertIsNotNone(dbroot1) + self.assertIsNotNone(dbroot2) + self.assertEqual(dbroot1.text, '1') + self.assertEqual(dbroot2.text, '2') + + def test_remove_dbroots_of_node(self): + '''Test that remove_dbroots_of_node correctly removes dbroots from the node's mapping''' + # Add dbroot association to the node + smc = self.root.find('./SystemModuleConfig') + dbroot1 = etree.SubElement(smc, f'ModuleDBRootID{self.our_module_idx}-1-3') + dbroot1.text = '1' + dbroot2 = etree.SubElement(smc, f'ModuleDBRootID{self.our_module_idx}-2-3') + dbroot2.text = '2' + # Add ModuleDBRootCount to the node + module_count = etree.SubElement(smc, f'ModuleDBRootCount{self.our_module_idx}-3') + module_count.text = '2' + + remove_dbroots_of_node(self.root, self.our_module_idx) + + # Check that ModuleDBRootCount was removed + module_count = self.root.find(f'./SystemModuleConfig/ModuleDBRootCount{self.our_module_idx}-3') + self.assertIsNone(module_count) + # Check that dbroot mappings of the module were removed + dbroot1 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-1-3') + dbroot2 = self.root.find(f'./SystemModuleConfig/ModuleDBRootID{self.our_module_idx}-2-3') + self.assertIsNone(dbroot1) + self.assertIsNone(dbroot2) diff --git a/cmapi/mcs_cluster_tool/__main__.py b/cmapi/mcs_cluster_tool/__main__.py index a2260d0c63..f339f7f6a6 100644 --- a/cmapi/mcs_cluster_tool/__main__.py +++ b/cmapi/mcs_cluster_tool/__main__.py @@ -76,7 +76,10 @@ def setup_logging(verbose: bool = False) -> None: add_logging_level('TRACE', 5) dict_config(MCS_CLI_LOG_CONF_PATH) if verbose: - enable_console_logging(logging.getLogger()) + for logger_name in ('', 'mcs_cli'): + logger = logging.getLogger(logger_name) + logger.setLevel(logging.DEBUG) + enable_console_logging(logger) if __name__ == '__main__': diff --git a/cmapi/mcs_cluster_tool/cluster_app.py b/cmapi/mcs_cluster_tool/cluster_app.py index d93c8cc2c9..9b0f37a6f1 100644 --- a/cmapi/mcs_cluster_tool/cluster_app.py +++ b/cmapi/mcs_cluster_tool/cluster_app.py @@ -198,6 +198,14 @@ def add( 'node IP, name or FQDN. ' 'Can be used multiple times to add several nodes at a time.' ) + ), + read_only: bool = typer.Option( + False, + '--read-only', + help=( + 'Add node (or nodes, if more than one is passed) in read-only ' + 'mode.' + ) ) ): """Add nodes to the Columnstore cluster.""" @@ -207,7 +215,9 @@ def add( extra_nodes=nodes ): for node in nodes: - result.append(client.add_node({'node': node})) + result.append( + client.add_node({'node': node, 'read_only': read_only}) + ) return result diff --git a/cmapi/mcs_node_control/models/node_config.py b/cmapi/mcs_node_control/models/node_config.py index 7dac18bce4..c71e397032 100644 --- a/cmapi/mcs_node_control/models/node_config.py +++ b/cmapi/mcs_node_control/models/node_config.py @@ -36,7 +36,7 @@ class NodeConfig: """ def get_current_config_root( self, config_filename: str = DEFAULT_MCS_CONF_PATH, upgrade=True - ): + ) -> etree.Element: """Retrieves current configuration. Read the config and returns Element. @@ -49,7 +49,7 @@ def get_current_config_root( self.upgrade_config(tree=tree, upgrade=upgrade) return tree.getroot() - def get_root_from_string(self, config_string: str): + def get_root_from_string(self, config_string: str) -> etree.Element: root = etree.fromstring(config_string) self.upgrade_config(root=root) return root @@ -566,4 +566,14 @@ def get_all_dbroots(self, root): for i in range(1, mod_count+1): for j in range(1, int(smc_node.find(f"./ModuleDBRootCount{i}-3").text) + 1): dbroots.append(smc_node.find(f"./ModuleDBRootID{i}-{j}-3").text) + return dbroots + + def is_read_only(self, root=None) -> bool: + """Checks if this node is in read-only mode""" + from cmapi_server.helpers import get_read_only_nodes # Avoid circular import + + root = root or self.get_current_config_root() + read_only_nodes = set(get_read_only_nodes(root)) + my_names = set(self.get_network_addresses_and_names()) + return bool(read_only_nodes.intersection(my_names)) \ No newline at end of file diff --git a/cmapi/pyproject.toml b/cmapi/pyproject.toml new file mode 100644 index 0000000000..c81c64faae --- /dev/null +++ b/cmapi/pyproject.toml @@ -0,0 +1,22 @@ +[tool.ruff] +line-length = 80 +target-version = "py39" +# Enable common rule sets +select = [ + "E", # pycodestyle errors + "F", # pyflakes: undefined names, unused imports, etc. + "I", # isort: import sorting + "B", # flake8-bugbear: common bugs and anti-patterns + "UP", # pyupgrade: use modern Python syntax + "N", # pep8-naming: naming conventions +] + +ignore = [] + +# Exclude cache and temporary directories +exclude = [ + "__pycache__", +] + +[tool.ruff.format] +quote-style = "single" diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp index 307ba1d399..676fab0ef5 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp @@ -350,7 +350,7 @@ int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID while (1) { - if (msgRecived == fWEClient->getPmCount()) + if (msgRecived == fWEClient->getRWConnections()) break; fWEClient->read(uniqueId, bsIn); diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index da3a9b2d36..0a619c7067 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -303,6 +303,7 @@ int32_t DistributedEngineComm::Setup() try { + idblog("trying to connect to PMS" << connectionId << ", " << cl->otherEnd() << ", address " << cl->addr2String()); if (cl->connect()) { newClients.push_back(cl); @@ -313,6 +314,7 @@ int32_t DistributedEngineComm::Setup() } else { + idblog("cannot connecti to PMS" << connectionId); throw runtime_error("Connection refused from PMS" + std::to_string(connectionId)); } } @@ -1092,7 +1094,9 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ if (!client->isAvailable()) return 0; + idblog("locking connection " << connectionId << " to send " << bs->length() << "bytes"); std::lock_guard lk(*(fWlock[connectionId])); + idblog("locked connection " << connectionId); client->write(bs, NULL, senderStats); } catch (...) @@ -1133,6 +1137,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ else throw runtime_error("DistributedEngineComm::write: Broken Pipe error"); } + idblog("bs length after write " << bs->length()); // Connection was established. return 1; @@ -1168,6 +1173,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET); */ } + idblog("bs length after write " << bs->length()); return 0; } @@ -1288,6 +1294,13 @@ void DistributedEngineComm::getLocalNetIfacesSins() } freeifaddrs(netIfacesList); } +void DistributedEngineComm::healthCheck() +{ + for (uint32_t i = 0; i < fPmConnections.size(); i++) + { + std::lock_guard lk(*(fWlock[i])); + } +} template bool DistributedEngineComm::clientAtTheSameHost( SharedPtrEMSock& client) const; } // namespace joblist diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 2c6c6ac865..950687856d 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -224,6 +224,9 @@ class DistributedEngineComm friend class ::TestDistributedEngineComm; + /* internal checks */ + void healthCheck(); + private: typedef std::vector ReaderList; typedef std::vector> ClientList; diff --git a/dbcon/mysql/install_mcs_mysql.sh.in b/dbcon/mysql/install_mcs_mysql.sh.in index ff80a18cf2..bc856d00ec 100755 --- a/dbcon/mysql/install_mcs_mysql.sh.in +++ b/dbcon/mysql/install_mcs_mysql.sh.in @@ -16,6 +16,11 @@ else MDB="/usr/bin/mysql" fi +log_error() { + echo "Error: $1" >> ${tmpdir}/mysql_install.log + exit 1 +} + # DELETE libcalmysql.so entries first as they are in ha_columnstore.so in 1.4.2 onwards $MDB 2> ${tmpdir}/mysql_install.log </dev/null +if [ $? -ne 0 ]; then + log_error "Failed to execute calsetuserpriority.sql." +fi + $MDB <@ENGINE_SUPPORTDIR@/calremoveuserpriority.sql 2>/dev/null +if [ $? -ne 0 ]; then + log_error "Failed to execute calremoveuserpriority.sql." +fi + $MDB <@ENGINE_SUPPORTDIR@/calshowprocesslist.sql 2>/dev/null +if [ $? -ne 0 ]; then + log_error "Failed to execute calshowprocesslist.sql." +fi + $MDB <@ENGINE_SUPPORTDIR@/columnstore_info.sql 2>/dev/null +if [ $? -ne 0 ]; then + log_error "Failed to execute columnstore_info.sql." +fi diff --git a/debian/control b/debian/control index cbb11cea4a..7bde08ee80 100644 --- a/debian/control +++ b/debian/control @@ -9,6 +9,7 @@ Depends: binutils, net-tools, python3, procps, + gawk, ${misc:Depends}, ${shlibs:Depends} Breaks: mariadb-columnstore-libs, diff --git a/dmlproc/batchinsertprocessor.cpp b/dmlproc/batchinsertprocessor.cpp index 785a31225a..d53183a267 100644 --- a/dmlproc/batchinsertprocessor.cpp +++ b/dmlproc/batchinsertprocessor.cpp @@ -230,6 +230,15 @@ void BatchInsertProc::buildLastPkg(messageqcpp::ByteStream& bs) bs << rt; } +uint32_t BatchInsertProc::selectNextPM() +{ + uint32_t pm; + do + { + pm = fBatchLoader->selectNextPM(); + } while (pm != 0 && fWEClient->isConnectionReadonly(pm)); + return pm; +} void BatchInsertProc::sendFirstBatch() { uint32_t firstPmId = 0; @@ -237,7 +246,7 @@ void BatchInsertProc::sendFirstBatch() try { - firstPmId = fBatchLoader->selectNextPM(); + firstPmId = selectNextPM(); } catch (std::exception& ex) { @@ -268,7 +277,7 @@ void BatchInsertProc::sendNextBatch() try { - fCurrentPMid = fBatchLoader->selectNextPM(); + fCurrentPMid = selectNextPM(); } catch (std::exception& ex) { diff --git a/dmlproc/batchinsertprocessor.h b/dmlproc/batchinsertprocessor.h index a950b0986d..ddb2a229ed 100644 --- a/dmlproc/batchinsertprocessor.h +++ b/dmlproc/batchinsertprocessor.h @@ -68,6 +68,7 @@ class BatchInsertProc void setHwm(); void receiveAllMsg(); void receiveOutstandingMsg(); + uint32_t selectNextPM(); private: SP_PKG fInsertPkgQueue; diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 0bc59e2e07..fcf3f1f64b 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -202,6 +202,7 @@ class CleanUpThread // This function rolls back any active transactions in case of an abnormal shutdown. void rollbackAll(DBRM* dbrm) { + idblog("rolling back all"); Oam oam; // Log a message in info.log @@ -592,6 +593,7 @@ int ServiceDMLProc::Child() //@Bug 1627 try { + idblog("rollback in Child() call"); rollbackAll(&dbrm); // Rollback any } catch (std::exception& e) diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 765f5f19f2..46949dbfbb 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -132,6 +132,7 @@ struct CancellationThread if (bRollback) { + idblog("starting rollback in cancellation thread"); RollbackTransactionProcessor rollbackProcessor(fDbrm); SessionManager sessionManager; uint64_t uniqueId = fDbrm->getUnique64(); @@ -1402,7 +1403,10 @@ void DMLProcessor::operator()() messageqcpp::ByteStream::byte status = 255; messageqcpp::ByteStream::octbyte rowCount = 0; - if (fDbrm->getSystemState(stateFlags) > + int rr = fDbrm->getSystemState(stateFlags); + idblog("called from DMLProcessor::operator(), returned " << rr); + + if (rr > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents { messageqcpp::ByteStream results; diff --git a/oam/install_scripts/columnstore-post-install.in b/oam/install_scripts/columnstore-post-install.in index a25a2eccc2..4f86eeb864 100755 --- a/oam/install_scripts/columnstore-post-install.in +++ b/oam/install_scripts/columnstore-post-install.in @@ -5,11 +5,76 @@ # Post-install steps for columnstore install running_systemd() { - if [ "$(ps --no-headers -o comm 1)" = "systemd" ]; then - echo 0 - else - echo 1 - fi + # Check if init process (PID 1) is systemd by reading /proc/1/comm + if [ -r "/proc/1/comm" ]; then + if [ "$(cat /proc/1/comm)" = "systemd" ]; then + echo 0 + return 0 + fi + fi + + # Alternative check using /proc/1/exe symlink + if [ -L "/proc/1/exe" ]; then + local exe_target + exe_target=$(readlink -f "/proc/1/exe") + if [[ "${exe_target##*/}" == "systemd" ]]; then + echo 0 + return 0 + fi + fi + + # Fallback check for systemd presence in /run/systemd + if [ -d "/run/systemd/system" ]; then + echo 0 + return 0 + fi + + echo 1 + return 1 +} + +get_parent_pid() { + local pid=$1 + local ppid + + # Read the stat file for the given PID + if [ -f "/proc/$pid/stat" ]; then + # Read the entire stat file into a variable + read -r stat <"/proc/$pid/stat" + + # Split into array (parent PID is the 4th field) + IFS=' ' read -ra stat_array <<<"$stat" + + # The PPID is the 4th element (0-based index 3) + ppid=${stat_array[3]} + + echo "$ppid" + return 0 + else + echo "Error: Process $pid does not exist" >&2 + return 1 + fi +} + +get_env_var_value() { + local pid=$1 + local var_name=$2 + local env_file="/proc/$pid/environ" + + if [ ! -r "$env_file" ]; then + echo "Error: Cannot read $env_file" >&2 + return 1 + fi + + # Read null-delimited environ file and process each line + while IFS= read -r -d '' line; do + if [[ "$line" == "${var_name}="* ]]; then + echo "${line#*=}" + return 0 + fi + done <"$env_file" + + return 1 } # This function recursively(up to PID 1) searches for @@ -18,67 +83,66 @@ find_env_var() { env_var_name=$1 pid=$$ ENV_VAR='' - while [ -z "$ENV_VAR" -a "$pid" != 1 ]; do - ppid=$(ps -oppid -p$pid|tail -1|awk '{print $1}') + while [ -z "$ENV_VAR" -a "$pid" != 1 ]; do + ppid=$(get_parent_pid $pid) # This condition is true in containers if [ "$ppid" == 0 ]; then - break; + break fi - env=$(strings /proc/$ppid/environ) - ENV_VAR=$(echo "$env"|awk -F= "\$1 == \"$env_var_name\" { print \$2; }") + ENV_VAR=$(get_env_var_value $ppid $env_var_name) pid=$ppid done echo $ENV_VAR } if [[ -f /etc/mysql/debian.cnf ]]; then - MDB="/usr/bin/mysql --defaults-file=/etc/mysql/debian.cnf" + MDB="/usr/bin/mysql --defaults-file=/etc/mysql/debian.cnf" else - MDB="/usr/bin/mysql" + MDB="/usr/bin/mysql" fi checkForError() { - # check for password error - grep "ERROR 1045" ${installLogDir}/mysql_install.log > ${installLogDir}/error.check - if [ `cat ${installLogDir}/error.check | wc -c` -ne 0 ]; then - echo "There were authentication issues running install_mcs_mysql.sh \ + # check for password error + grep "ERROR 1045" ${installLogDir}/mysql_install.log >${installLogDir}/error.check + if [ $(cat ${installLogDir}/error.check | wc -c) -ne 0 ]; then + echo "There were authentication issues running install_mcs_mysql.sh \ script. The log is available in ${installLogDir}/mysql_install.log. Please re-run \ columnstore-post-install manually after solving the authentication issues." - rm -f ${installLogDir}/error.check - return 2; - fi + rm -f ${installLogDir}/error.check + return 2 + fi - rm -f ${installLogDir}/error.check + rm -f ${installLogDir}/error.check - #--------------------------------------------------------------------------- - # See if engine columnstore exist - #--------------------------------------------------------------------------- - echo "checking for engine columnstore..." - $MDB --execute="show engines" 2> ${installLogDir}/post-mysql-install.log | grep -i columnstore >> ${installLogDir}/post-mysql-install.log 2>&1 + #--------------------------------------------------------------------------- + # See if engine columnstore exist + #--------------------------------------------------------------------------- + echo "checking for engine columnstore..." + $MDB --execute="show engines" 2>${installLogDir}/post-mysql-install.log | grep -i columnstore >>${installLogDir}/post-mysql-install.log 2>&1 - # - # Add compressiontype column to SYSCOLUMN if applicable - # - if [ $? -ne 0 ]; then - echo "columnstore doesn't exist" - return 1 - fi + # + # Add compressiontype column to SYSCOLUMN if applicable + # + if [ $? -ne 0 ]; then + echo "columnstore doesn't exist" + return 1 + fi - echo "columnstore exists" + echo "columnstore exists" - return 0; + return 0 } rpmmode=install -user=`whoami 2>/dev/null` +user=$(whoami 2>/dev/null) quiet=0 stop_mysqld=0 -if [ -z "$(pgrep -x mariadbd)" ];then +if [ -z "$(pgrep -x mariadbd)" ]; then # Startup mysqld - systemctl cat mariadb.service > /dev/null 2>&1 + systemctl cat mariadb.service >/dev/null 2>&1 if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then systemctl start mariadb.service else @@ -107,12 +171,11 @@ if [ -f @ENGINE_SYSCONFDIR@/columnstore/Columnstore.xml.rpmsave ]; then fi touch /dev/shm/columnstore-test && rm /dev/shm/columnstore-test -if [ $? -ne 0 ] ; then +if [ $? -ne 0 ]; then echo "User $user will need R/W access to /dev/shm." exit 1 fi - profileFile="/etc/profile.d/columnstoreAlias.sh" /bin/cp -f @ENGINE_SUPPORTDIR@/columnstoreAlias /etc/profile.d/columnstoreAlias.sh chmod 644 /etc/profile.d/columnstoreAlias.sh >/dev/null 2>&1 @@ -124,7 +187,7 @@ test -d @ENGINE_LOGDIR@/trace || mkdir @ENGINE_LOGDIR@/trace >/dev/null 2>&1 test -d @ENGINE_LOGDIR@/cpimport || mkdir @ENGINE_LOGDIR@/cpimport >/dev/null 2>&1 test -d @ENGINE_LOGDIR@/install || mkdir @ENGINE_LOGDIR@/install >/dev/null 2>&1 test -h @ENGINE_LOGDIR@/data && rm -f @ENGINE_LOGDIR@/data -chmod 755 @ENGINE_LOGDIR@/corefiles > /dev/null 2>&1 +chmod 755 @ENGINE_LOGDIR@/corefiles >/dev/null 2>&1 chmod 777 @ENGINE_LOGDIR@/cpimport chmod 777 @ENGINE_LOGDIR@/install installLogDir=@ENGINE_LOGDIR@/install @@ -136,7 +199,7 @@ test -d @ENGINE_DATADIR@/data1/systemFiles/dataTransaction || rmdir @ENGINE_DATA test -d @ENGINE_DATADIR@/data1/systemFiles/dataTransaction/archive || rmdir @ENGINE_DATADIR@/data1/systemFiles/dataTransaction/archive >/dev/null 2>&1 chmod 1755 @ENGINE_DATADIR@/data1 >/dev/null 2>&1 chmod -R 1755 @ENGINE_DATADIR@/data1/systemFiles >/dev/null 2>&1 -chmod 1755 @ENGINE_SYSCONFDIR@/columnstore > /dev/null 2>&1 +chmod 1755 @ENGINE_SYSCONFDIR@/columnstore >/dev/null 2>&1 #create the bulk-load dirs mkdir -p @ENGINE_LOGDIR@/data/bulk/data/import >/dev/null 2>&1 @@ -147,28 +210,28 @@ rm -f @ENGINE_LOGDIR@/data/bulk/tmpjob/* >/dev/null 2>&1 chmod -R 777 @ENGINE_LOGDIR@/data #get columnstore temp file directory name -tmpDir=`@ENGINE_BINDIR@/mcsGetConfig SystemConfig SystemTempFileDir` -scratchDir=$tmpDir`@ENGINE_BINDIR@/mcsGetConfig SystemConfig hdfsRdwrScratch` +tmpDir=$(@ENGINE_BINDIR@/mcsGetConfig SystemConfig SystemTempFileDir) +scratchDir=$tmpDir$(@ENGINE_BINDIR@/mcsGetConfig SystemConfig hdfsRdwrScratch) mkdir $tmpDir >/dev/null 2>&1 mkdir $scratchDir >/dev/null 2>&1 chmod 777 $tmpDir chmod 777 $scratchDir #create mount directories -mkdir /mnt/tmp > /dev/null 2>&1 +mkdir /mnt/tmp >/dev/null 2>&1 if [ $user = "root" ]; then - #setup the columnstore service script + #setup the columnstore service script rm -f /etc/init.d/columnstore >/dev/null 2>&1 rm -f /etc/default/columnstore - systemctl=`which systemctl 2>/dev/null` + systemctl=$(which systemctl 2>/dev/null) if [ -n "$systemctl" ]; then # Removing a separate ExeMgr unit. - if [[ -f /usr/lib/systemd/system/mcs-exemgr.service ]]; then + if [[ -f /usr/lib/systemd/system/mcs-exemgr.service ]]; then rm -f /usr/lib/systemd/system/mcs-exemgr.service fi - if [[ -f /lib/systemd/system/mcs-exemgr.service ]]; then + if [[ -f /lib/systemd/system/mcs-exemgr.service ]]; then rm -f /lib/systemd/system/mcs-exemgr.service fi cp @ENGINE_SUPPORTDIR@/mariadb-columnstore.service /usr/lib/systemd/system/. >/dev/null 2>&1 @@ -181,7 +244,7 @@ if [ $user = "root" ]; then cp @ENGINE_SUPPORTDIR@/mcs-dmlproc.service /lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-primproc.service /usr/lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-primproc.service /lib/systemd/system/. >/dev/null 2>&1 - cp @ENGINE_SUPPORTDIR@/mcs-workernode.service /usr/lib/systemd/system/mcs-workernode@.service >/dev/null 2>&1 + cp @ENGINE_SUPPORTDIR@/mcs-workernode.service /usr/lib/systemd/system/mcs-workernode@.service >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-workernode.service /lib/systemd/system/mcs-workernode@.service >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-writeengineserver.service /usr/lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-writeengineserver.service /lib/systemd/system/. >/dev/null 2>&1 @@ -190,27 +253,27 @@ if [ $user = "root" ]; then cp @ENGINE_SUPPORTDIR@/mcs-storagemanager.service /usr/lib/systemd/system/. >/dev/null 2>&1 cp @ENGINE_SUPPORTDIR@/mcs-storagemanager.service /lib/systemd/system/. >/dev/null 2>&1 systemctl enable mariadb-columnstore >/dev/null 2>&1 - systemctl enable mcs-controllernode > /dev/null 2>&1 - systemctl enable mcs-ddlproc > /dev/null 2>&1 - systemctl enable mcs-dmlproc > /dev/null 2>&1 - systemctl enable mcs-primproc > /dev/null 2>&1 - systemctl enable mcs-workernode@1 > /dev/null 2>&1 - systemctl enable mcs-writeengineserver > /dev/null 2>&1 - systemctl enable mcs-loadbrm > /dev/null 2>&1 - systemctl enable mcs-storagemanager > /dev/null 2>&1 + systemctl enable mcs-controllernode >/dev/null 2>&1 + systemctl enable mcs-ddlproc >/dev/null 2>&1 + systemctl enable mcs-dmlproc >/dev/null 2>&1 + systemctl enable mcs-primproc >/dev/null 2>&1 + systemctl enable mcs-workernode@1 >/dev/null 2>&1 + systemctl enable mcs-writeengineserver >/dev/null 2>&1 + systemctl enable mcs-loadbrm >/dev/null 2>&1 + systemctl enable mcs-storagemanager >/dev/null 2>&1 else - chkconfig=`which chkconfig 2>/dev/null` + chkconfig=$(which chkconfig 2>/dev/null) if [ -n "$chkconfig" ]; then cp @ENGINE_SBINDIR@/columnstore /etc/init.d/. >/dev/null 2>&1 - chkconfig --add columnstore > /dev/null 2>&1 - chkconfig columnstore on > /dev/null 2>&1 + chkconfig --add columnstore >/dev/null 2>&1 + chkconfig columnstore on >/dev/null 2>&1 else cp @ENGINE_SBINDIR@/columnstore /etc/init.d/. >/dev/null 2>&1 - updaterc=`which update-rc.d 2>/dev/null` + updaterc=$(which update-rc.d 2>/dev/null) if [ -n "$updaterc" ]; then - update-rc.d columnstore defaults 99 > /dev/null 2>&1 + update-rc.d columnstore defaults 99 >/dev/null 2>&1 else echo "" echo "Package 'systemctl', 'chkconfig' or 'update-rc.d' not installed, contact your sysadmin if you want to setup to autostart for columnstore" @@ -221,12 +284,12 @@ fi # upgrade the columnstore.cnf file if [ -f @MARIADB_MYCNFDIR@/columnstore.cnf.rpmsave ]; then - cp -f @MARIADB_MYCNFDIR@/columnstore.cnf @MARIADB_MYCNFDIR@/columnstore.cnf.new - cp -f @MARIADB_MYCNFDIR@/columnstore.cnf.rpmsave @MARIADB_MYCNFDIR@/columnstore.cnf + cp -f @MARIADB_MYCNFDIR@/columnstore.cnf @MARIADB_MYCNFDIR@/columnstore.cnf.new + cp -f @MARIADB_MYCNFDIR@/columnstore.cnf.rpmsave @MARIADB_MYCNFDIR@/columnstore.cnf fi if [ $user = "root" ]; then - @ENGINE_BINDIR@/columnstoreSyslogSetup.sh install > $installLogDir/syslog_install.log 2>&1 + @ENGINE_BINDIR@/columnstoreSyslogSetup.sh install >$installLogDir/syslog_install.log 2>&1 #check if MariaDB Columnstore system logging was setup cat $installLogDir/syslog_install.log | grep 'No System Logging' >/dev/null 2>&1 @@ -236,7 +299,7 @@ if [ $user = "root" ]; then else chown $user:$user @ENGINE_SYSCONFDIR@/columnstore/Columnstore.xml -cat < /dev/null 2>&1 +systemctl cat mariadb.service >/dev/null 2>&1 if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then - systemctl restart mariadb.service > /dev/null 2>&1 + systemctl restart mariadb.service >/dev/null 2>&1 else - pkill mysqld > /dev/null 2>&1 - while [ -n "$(pgrep -x mysqld)" ] ; do + pkill mysqld >/dev/null 2>&1 + while [ -n "$(pgrep -x mysqld)" ]; do sleep 1 done /usr/bin/mysqld_safe & @@ -286,88 +349,88 @@ fi $MDB <@ENGINE_SUPPORTDIR@/syscatalog_mysql.sql 2>/dev/null if [ -z "$MCS_USE_S3_STORAGE" ]; then - MCS_USE_S3_STORAGE="$(find_env_var "MCS_USE_S3_STORAGE")" - MCS_S3_BUCKET="$(find_env_var "MCS_S3_BUCKET")" - MCS_S3_ENDPOINT="$(find_env_var "MCS_S3_ENDPOINT")" - MCS_S3_ACCESS_KEY_ID="$(find_env_var "MCS_S3_ACCESS_KEY_ID")" - MCS_S3_SECRET_ACCESS_KEY="$(find_env_var "MCS_S3_SECRET_ACCESS_KEY")" - MCS_S3_REGION="$(find_env_var "MCS_S3_REGION")" - MCS_S3_ROLE_NAME="$(find_env_var "MCS_S3_ROLE_NAME")" - MCS_S3_STS_REGION="$(find_env_var "MCS_S3_STS_REGION")" - MCS_S3_STS_ENDPOINT="$(find_env_var "MCS_S3_STS_ENDPOINT")" - MCS_S3_USE_HTTP="$(find_env_var "MCS_S3_USE_HTTP")" - MCS_S3_NO_SSL_VERIFY="$(find_env_var "MCS_S3_NO_SSL_VERIFY")" - MCS_S3_LIBS3_DEBUG="$(find_env_var "MCS_S3_LIBS3_DEBUG")" + MCS_USE_S3_STORAGE="$(find_env_var "MCS_USE_S3_STORAGE")" + MCS_S3_BUCKET="$(find_env_var "MCS_S3_BUCKET")" + MCS_S3_ENDPOINT="$(find_env_var "MCS_S3_ENDPOINT")" + MCS_S3_ACCESS_KEY_ID="$(find_env_var "MCS_S3_ACCESS_KEY_ID")" + MCS_S3_SECRET_ACCESS_KEY="$(find_env_var "MCS_S3_SECRET_ACCESS_KEY")" + MCS_S3_REGION="$(find_env_var "MCS_S3_REGION")" + MCS_S3_ROLE_NAME="$(find_env_var "MCS_S3_ROLE_NAME")" + MCS_S3_STS_REGION="$(find_env_var "MCS_S3_STS_REGION")" + MCS_S3_STS_ENDPOINT="$(find_env_var "MCS_S3_STS_ENDPOINT")" + MCS_S3_USE_HTTP="$(find_env_var "MCS_S3_USE_HTTP")" + MCS_S3_NO_SSL_VERIFY="$(find_env_var "MCS_S3_NO_SSL_VERIFY")" + MCS_S3_LIBS3_DEBUG="$(find_env_var "MCS_S3_LIBS3_DEBUG")" fi if [ ! -z "$MCS_USE_S3_STORAGE" ] && [ $MCS_USE_S3_STORAGE -eq 1 ]; then - if [ -z "$MCS_S3_BUCKET" ]; then - echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_BUCKET." - fi - if [ -z "$MCS_S3_ACCESS_KEY_ID" ] && [ -z "$MCS_S3_ROLE_NAME" ]; then - echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_ACCESS_KEY_ID." - fi - if [ -z "$MCS_S3_SECRET_ACCESS_KEY" ] && [ -z "$MCS_S3_ROLE_NAME" ]; then - echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_SECRET_ACCESS_KEY." - fi - if [ -z "$MCS_S3_BUCKET" ] || [[ -z "$MCS_S3_ACCESS_KEY_ID" && -z "$MCS_S3_ROLE_NAME" ]] || [[ -z "$MCS_S3_SECRET_ACCESS_KEY" && -z "$MCS_S3_ROLE_NAME" ]]; then - echo "Using local storage." - @ENGINE_BINDIR@/mcsSetConfig -d Installation DBRootStorageType "storagemanager" - @ENGINE_BINDIR@/mcsSetConfig -d StorageManager Enabled "Y" - @ENGINE_BINDIR@/mcsSetConfig -d SystemConfig DataFilePlugin "libcloudio.so" - else - @ENGINE_BINDIR@/mcsSetConfig -d Installation DBRootStorageType "storagemanager" - @ENGINE_BINDIR@/mcsSetConfig -d StorageManager Enabled "Y" - @ENGINE_BINDIR@/mcsSetConfig -d SystemConfig DataFilePlugin "libcloudio.so" - sed -i "s|^service =.*|service = S3|" /etc/columnstore/storagemanager.cnf - if [ ! -z "$MCS_S3_REGION" ]; then - sed -i "s|^region =.*|region = $MCS_S3_REGION|" /etc/columnstore/storagemanager.cnf - fi - if [ ! -z "$MCS_S3_ROLE_NAME" ]; then - sed -i "s|^# iam_role_name =.*|iam_role_name = $MCS_S3_ROLE_NAME|" /etc/columnstore/storagemanager.cnf - fi - if [ ! -z "$MCS_S3_STS_REGION" ]; then - sed -i "s|^# sts_region =.*|sts_region = $MCS_S3_STS_REGION|" /etc/columnstore/storagemanager.cnf - fi - if [ ! -z "$MCS_S3_STS_ENDPOINT" ]; then - sed -i "s|^# sts_endpoint =.*|sts_endpoint = $MCS_S3_STS_ENDPOINT|" /etc/columnstore/storagemanager.cnf - fi - if [ ! -z "$MCS_S3_USE_HTTP" ]; then - sed -i "s|^# use_http =.*|use_http = enabled|" /etc/columnstore/storagemanager.cnf - fi - if [ ! -z "$MCS_S3_NO_SSL_VERIFY" ]; then - sed -i "s|^# ssl_verify =.*|ssl_verify = disabled|" /etc/columnstore/storagemanager.cnf - fi - if [ ! -z "$MCS_S3_LIBS3_DEBUG" ]; then - sed -i "s|^# libs3_debug =.*|libs3_debug = enabled|" /etc/columnstore/storagemanager.cnf - fi - sed -i "s|^bucket =.*|bucket = $MCS_S3_BUCKET|" /etc/columnstore/storagemanager.cnf - sed -i "s|^# endpoint =.*|endpoint = $MCS_S3_ENDPOINT|" /etc/columnstore/storagemanager.cnf - sed -i "s|^# aws_access_key_id =.*|aws_access_key_id = $MCS_S3_ACCESS_KEY_ID|" /etc/columnstore/storagemanager.cnf - sed -i "s|^# aws_secret_access_key =.*|aws_secret_access_key = $MCS_S3_SECRET_ACCESS_KEY|" /etc/columnstore/storagemanager.cnf - @ENGINE_BINDIR@/testS3Connection - if [ $? -ne 0 ]; then - sed -i "s|^iam_role_name =.*|# iam_role_name = |" /etc/columnstore/storagemanager.cnf - sed -i "s|^sts_region =.*|# sts_region = |" /etc/columnstore/storagemanager.cnf - sed -i "s|^sts_endpoint =.*|# sts_endpoint = |" /etc/columnstore/storagemanager.cnf - sed -i "s|^endpoint =.*|# endpoint = |" /etc/columnstore/storagemanager.cnf - sed -i "s|^aws_access_key_id =.*|# aws_access_key_id = |" /etc/columnstore/storagemanager.cnf - sed -i "s|^aws_secret_access_key =.*|# aws_secret_access_key = |" /etc/columnstore/storagemanager.cnf - echo "There was an error validating the settings used to access S3." - echo "The specified user or role must have GET, PUT, HEAD, and DELETE permissions to the bucket." - echo "Verify the following environment variables are correct:" - echo "MCS_S3_BUCKET" - echo "MCS_S3_ENDPOINT" - echo "MCS_S3_ACCESS_KEY_ID" - echo "MCS_S3_SECRET_ACCESS_KEY" - echo "MCS_S3_REGION" - echo "MCS_S3_ROLE_NAME (optional)" - echo "MCS_S3_STS_REGION (optional)" - echo "MCS_S3_STS_ENDPOINT (optional)" - echo "After environment variables are fixed, run command: columnstore-post-install" - exit 1 - fi - fi + if [ -z "$MCS_S3_BUCKET" ]; then + echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_BUCKET." + fi + if [ -z "$MCS_S3_ACCESS_KEY_ID" ] && [ -z "$MCS_S3_ROLE_NAME" ]; then + echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_ACCESS_KEY_ID." + fi + if [ -z "$MCS_S3_SECRET_ACCESS_KEY" ] && [ -z "$MCS_S3_ROLE_NAME" ]; then + echo "Environment variable \$MCS_USE_S3_STORAGE is set but there is no \$MCS_S3_SECRET_ACCESS_KEY." + fi + if [ -z "$MCS_S3_BUCKET" ] || [[ -z "$MCS_S3_ACCESS_KEY_ID" && -z "$MCS_S3_ROLE_NAME" ]] || [[ -z "$MCS_S3_SECRET_ACCESS_KEY" && -z "$MCS_S3_ROLE_NAME" ]]; then + echo "Using local storage." + @ENGINE_BINDIR@/mcsSetConfig -d Installation DBRootStorageType "storagemanager" + @ENGINE_BINDIR@/mcsSetConfig -d StorageManager Enabled "Y" + @ENGINE_BINDIR@/mcsSetConfig -d SystemConfig DataFilePlugin "libcloudio.so" + else + @ENGINE_BINDIR@/mcsSetConfig -d Installation DBRootStorageType "storagemanager" + @ENGINE_BINDIR@/mcsSetConfig -d StorageManager Enabled "Y" + @ENGINE_BINDIR@/mcsSetConfig -d SystemConfig DataFilePlugin "libcloudio.so" + sed -i "s|^service =.*|service = S3|" /etc/columnstore/storagemanager.cnf + if [ ! -z "$MCS_S3_REGION" ]; then + sed -i "s|^region =.*|region = $MCS_S3_REGION|" /etc/columnstore/storagemanager.cnf + fi + if [ ! -z "$MCS_S3_ROLE_NAME" ]; then + sed -i "s|^# iam_role_name =.*|iam_role_name = $MCS_S3_ROLE_NAME|" /etc/columnstore/storagemanager.cnf + fi + if [ ! -z "$MCS_S3_STS_REGION" ]; then + sed -i "s|^# sts_region =.*|sts_region = $MCS_S3_STS_REGION|" /etc/columnstore/storagemanager.cnf + fi + if [ ! -z "$MCS_S3_STS_ENDPOINT" ]; then + sed -i "s|^# sts_endpoint =.*|sts_endpoint = $MCS_S3_STS_ENDPOINT|" /etc/columnstore/storagemanager.cnf + fi + if [ ! -z "$MCS_S3_USE_HTTP" ]; then + sed -i "s|^# use_http =.*|use_http = enabled|" /etc/columnstore/storagemanager.cnf + fi + if [ ! -z "$MCS_S3_NO_SSL_VERIFY" ]; then + sed -i "s|^# ssl_verify =.*|ssl_verify = disabled|" /etc/columnstore/storagemanager.cnf + fi + if [ ! -z "$MCS_S3_LIBS3_DEBUG" ]; then + sed -i "s|^# libs3_debug =.*|libs3_debug = enabled|" /etc/columnstore/storagemanager.cnf + fi + sed -i "s|^bucket =.*|bucket = $MCS_S3_BUCKET|" /etc/columnstore/storagemanager.cnf + sed -i "s|^# endpoint =.*|endpoint = $MCS_S3_ENDPOINT|" /etc/columnstore/storagemanager.cnf + sed -i "s|^# aws_access_key_id =.*|aws_access_key_id = $MCS_S3_ACCESS_KEY_ID|" /etc/columnstore/storagemanager.cnf + sed -i "s|^# aws_secret_access_key =.*|aws_secret_access_key = $MCS_S3_SECRET_ACCESS_KEY|" /etc/columnstore/storagemanager.cnf + @ENGINE_BINDIR@/testS3Connection + if [ $? -ne 0 ]; then + sed -i "s|^iam_role_name =.*|# iam_role_name = |" /etc/columnstore/storagemanager.cnf + sed -i "s|^sts_region =.*|# sts_region = |" /etc/columnstore/storagemanager.cnf + sed -i "s|^sts_endpoint =.*|# sts_endpoint = |" /etc/columnstore/storagemanager.cnf + sed -i "s|^endpoint =.*|# endpoint = |" /etc/columnstore/storagemanager.cnf + sed -i "s|^aws_access_key_id =.*|# aws_access_key_id = |" /etc/columnstore/storagemanager.cnf + sed -i "s|^aws_secret_access_key =.*|# aws_secret_access_key = |" /etc/columnstore/storagemanager.cnf + echo "There was an error validating the settings used to access S3." + echo "The specified user or role must have GET, PUT, HEAD, and DELETE permissions to the bucket." + echo "Verify the following environment variables are correct:" + echo "MCS_S3_BUCKET" + echo "MCS_S3_ENDPOINT" + echo "MCS_S3_ACCESS_KEY_ID" + echo "MCS_S3_SECRET_ACCESS_KEY" + echo "MCS_S3_REGION" + echo "MCS_S3_ROLE_NAME (optional)" + echo "MCS_S3_STS_REGION (optional)" + echo "MCS_S3_STS_ENDPOINT (optional)" + echo "After environment variables are fixed, run command: columnstore-post-install" + exit 1 + fi + fi fi #change ownership/permissions to be able to run columnstore as non-root @@ -375,14 +438,14 @@ fi if [ $(running_systemd) -eq 0 ]; then chown -R @DEFAULT_USER@:@DEFAULT_GROUP@ @ENGINE_LOGDIR@ chown -R @DEFAULT_USER@:@DEFAULT_GROUP@ /etc/columnstore - findcmd=`which find 2>/dev/null` + findcmd=$(which find 2>/dev/null) if [ -n "$findcmd" ]; then if [[ $($findcmd @ENGINE_DATADIR@ -maxdepth 3 ! -user @DEFAULT_USER@ -exec echo {} \; -quit 2>/dev/null) ]]; then echo "At least one file is not owned by @DEFAULT_USER@ in @ENGINE_DATADIR@. Running chown..." chown -R @DEFAULT_USER@:@DEFAULT_GROUP@ @ENGINE_DATADIR@ else echo "Confirmed top @ENGINE_DATADIR@ folders owned by @DEFAULT_USER@" - fi + fi else chown -R @DEFAULT_USER@:@DEFAULT_GROUP@ @ENGINE_DATADIR@ fi @@ -390,7 +453,7 @@ if [ $(running_systemd) -eq 0 ]; then chmod 777 /dev/shm fi -systemctl cat mariadb-columnstore.service > /dev/null 2>&1 +systemctl cat mariadb-columnstore.service >/dev/null 2>&1 if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then # mask mariadb-columnstore service to prevent starting services and dbbuilder @@ -421,13 +484,13 @@ if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then flock -u "$fd_lock" fi -if [ $stop_mysqld -eq 1 ];then +if [ $stop_mysqld -eq 1 ]; then # Make sure we stop mariadb since it wasn't running prior to columnstore installation - systemctl cat mariadb.service > /dev/null 2>&1 + systemctl cat mariadb.service >/dev/null 2>&1 if [ $? -eq 0 ] && [ $(running_systemd) -eq 0 ]; then - systemctl stop mariadb.service > /dev/null 2>&1 + systemctl stop mariadb.service >/dev/null 2>&1 else - pkill mysqld > /dev/null 2>&1 + pkill mysqld >/dev/null 2>&1 fi fi diff --git a/oam/install_scripts/mariadb-columnstore-start.sh.in b/oam/install_scripts/mariadb-columnstore-start.sh.in index f41b6647df..6617355172 100644 --- a/oam/install_scripts/mariadb-columnstore-start.sh.in +++ b/oam/install_scripts/mariadb-columnstore-start.sh.in @@ -14,7 +14,7 @@ flock -n "$fd_lock" || exit 0 /bin/systemctl start mcs-writeengineserver /bin/systemctl start mcs-dmlproc /bin/systemctl start mcs-ddlproc -su -s /bin/sh -c '@ENGINE_BINDIR@/dbbuilder 7' @DEFAULT_USER@ 1> @ENGINE_LOGDIR@/install/dbbuilder.log +su -s /bin/sh -c 'ASAN_OPTIONS=abort_on_error=1:disable_coredump=0,print_stats=false,detect_odr_violation=0,check_initialization_order=1,detect_stack_use_after_return=1,atexit=false,log_path=/core/asan.dbbuilder @ENGINE_BINDIR@/dbbuilder 7' @DEFAULT_USER@ 1> @ENGINE_LOGDIR@/install/dbbuilder.log flock -u "$fd_lock" diff --git a/storage-manager/src/smcat.cpp b/storage-manager/src/smcat.cpp index 48dc50772c..e1655802bc 100644 --- a/storage-manager/src/smcat.cpp +++ b/storage-manager/src/smcat.cpp @@ -44,6 +44,7 @@ bool SMOnline() int err = ::connect(clientSocket, (const struct sockaddr*)&addr, sizeof(addr)); if (err >= 0) { + cerr << "unable to connect to clientSocket\n"; ::close(clientSocket); return true; } diff --git a/utils/loggingcpp/exceptclasses.h b/utils/loggingcpp/exceptclasses.h index 4564360669..511c52aa75 100644 --- a/utils/loggingcpp/exceptclasses.h +++ b/utils/loggingcpp/exceptclasses.h @@ -283,6 +283,24 @@ class ProtocolError : public std::logic_error } \ } while (0) +#define idblog(x) \ + do \ + { \ + { \ + std::ostringstream os; \ + \ + os << __FILE__ << "@" << __LINE__ << ": \'" << x << "\'"; \ + std::cerr << os.str() << std::endl; \ + logging::MessageLog logger((logging::LoggingID())); \ + logging::Message message; \ + logging::Message::Args args; \ + \ + args.add(os.str()); \ + message.format(args); \ + logger.logErrorMessage(message); \ + } \ + } while (0) + #define idbassert_s(x, s) \ do \ { \ diff --git a/utils/messageqcpp/messagequeue.cpp b/utils/messageqcpp/messagequeue.cpp index a5cad8e492..f66d3ec5e8 100644 --- a/utils/messageqcpp/messagequeue.cpp +++ b/utils/messageqcpp/messagequeue.cpp @@ -246,6 +246,7 @@ const SBS MessageQueueClient::read(const struct timespec* timeout, bool* isTimeO { if (!fClientSock.isOpen()) { + idblog("socket is not open, addr " << addr2String()); fClientSock.open(); try @@ -254,6 +255,7 @@ const SBS MessageQueueClient::read(const struct timespec* timeout, bool* isTimeO } catch (...) { + idblog("cannot connect"); fClientSock.close(); throw; } diff --git a/versioning/BRM/dbrm.cpp b/versioning/BRM/dbrm.cpp index f95b30825c..c87c64d736 100644 --- a/versioning/BRM/dbrm.cpp +++ b/versioning/BRM/dbrm.cpp @@ -3479,6 +3479,7 @@ int DBRM::getSystemReady() throw() { uint32_t stateFlags; + idblog("calling from getSystemReday"); if (getSystemState(stateFlags) < 0) { return -1; @@ -3491,6 +3492,7 @@ int DBRM::getSystemQueryReady() throw() { uint32_t stateFlags; + idblog("calling from getSystemQueryReday"); if (getSystemState(stateFlags) < 0) { return -1; diff --git a/versioning/BRM/extentmap.cpp b/versioning/BRM/extentmap.cpp index ac584b776a..edaa145987 100644 --- a/versioning/BRM/extentmap.cpp +++ b/versioning/BRM/extentmap.cpp @@ -4541,7 +4541,7 @@ void ExtentMap::getDbRootHWMInfo(int OID, uint16_t pmNumber, EmDbRootHWMInfo_v& "There are no DBRoots for OID " << OID << " and PM " << pmNumber << endl; log(oss.str(), logging::LOG_TYPE_CRITICAL); - throw invalid_argument(oss.str()); + // throw invalid_argument(oss.str()); } grabEMEntryTable(READ); diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index 27e0eece01..358ff52015 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -2116,8 +2116,8 @@ int SlaveComm::replayJournal(string prefix) const char* filename = fName.c_str(); - IDBDataFile* journalf = - IDBDataFile::open(IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0); + std::unique_ptr journalf = + std::unique_ptr(IDBDataFile::open(IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0)); if (!journalf) { diff --git a/versioning/BRM/vbbm.cpp b/versioning/BRM/vbbm.cpp index d490783eb6..b485304e36 100644 --- a/versioning/BRM/vbbm.cpp +++ b/versioning/BRM/vbbm.cpp @@ -986,12 +986,12 @@ void VBBM::loadVersion2(IDBDataFile* in) } size_t readSize = vbbmEntries * sizeof(entry); - char* readBuf = new char[readSize]; + std::unique_ptr readBuf(new char[readSize]); size_t progress = 0; int err; while (progress < readSize) { - err = in->read(readBuf + progress, readSize - progress); + err = in->read(readBuf.get() + progress, readSize - progress); if (err < 0) { log_errno("VBBM::load()"); @@ -1005,7 +1005,7 @@ void VBBM::loadVersion2(IDBDataFile* in) progress += err; } - VBBMEntry* loadedEntries = (VBBMEntry*)readBuf; + VBBMEntry* loadedEntries = reinterpret_cast(readBuf.get()); for (i = 0; i < vbbmEntries; i++) insert(loadedEntries[i].lbid, loadedEntries[i].verID, loadedEntries[i].vbOID, loadedEntries[i].vbFBO, true); diff --git a/versioning/BRM/vss.cpp b/versioning/BRM/vss.cpp index 522fc378ee..b437f670c0 100644 --- a/versioning/BRM/vss.cpp +++ b/versioning/BRM/vss.cpp @@ -1402,12 +1402,12 @@ void VSS::load(string filename) */ size_t readSize = header.entries * sizeof(entry); - char* readBuf = new char[readSize]; + std::unique_ptr readBuf(new char[readSize]); size_t progress = 0; int err; while (progress < readSize) { - err = in->read(readBuf + progress, readSize - progress); + err = in->read(readBuf.get() + progress, readSize - progress); if (err < 0) { log_errno("VBBM::load()"); @@ -1421,7 +1421,7 @@ void VSS::load(string filename) progress += err; } - VSSEntry* loadedEntries = (VSSEntry*)readBuf; + VSSEntry* loadedEntries = reinterpret_cast(readBuf.get()); for (i = 0; i < header.entries; i++) insert(loadedEntries[i].lbid, loadedEntries[i].verID, loadedEntries[i].vbFlag, loadedEntries[i].locked, true); diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index 1badf9ab44..277b744853 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -154,6 +154,22 @@ struct QueueShutdown x.shutdown(); } }; + +/** + + * This function checks if the WriteEngineServer (WES) is configured + * for the specified node in the configuration. + * @param config Pointer to the configuration object + * @param fOtherEnd The name of the node to check + * @return true if WES is configured, false otherwise + */ +bool isWESConfigured(config::Config* config, const std::string& fOtherEnd) +{ + // Check if WES IP address record exists in the config (if not, this is a read-only node) + std::string otherEndDnOrIPStr = config->getConfig(fOtherEnd, "IPAddr"); + return !(otherEndDnOrIPStr.empty() || otherEndDnOrIPStr == "unassigned"); +} + } // namespace namespace WriteEngine @@ -224,6 +240,13 @@ void WEClients::Setup() snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID); string fServer(buff); + // Check if WES is configured for this module + if (!isWESConfigured(rm->getConfig(), fServer)) + { + writeToLog(__FILE__, __LINE__, "Skipping WriteEngineServer client creation for " + fServer + " as the node is read-only", LOG_TYPE_INFO); + continue; + } + boost::shared_ptr cl(new MessageQueueClient(fServer, rm->getConfig())); boost::shared_ptr nl(new boost::mutex()); @@ -287,6 +310,11 @@ void WEClients::Setup() } } +bool WEClients::isConnectionReadonly(uint32_t connection) +{ + return fPmConnections[connection] == nullptr; +} + int WEClients::Close() { makeBusy(false); @@ -315,13 +343,17 @@ void WEClients::Listen(boost::shared_ptr client, uint32_t co { SBS sbs; + idblog("conn index " << connIndex << ", other end " << client->otherEnd()); + try { while (Busy()) { // TODO: This call blocks so setting Busy() in another thread doesn't work here... + idblog("receiving packet. conn index " << connIndex << ", other end " << client->otherEnd()); sbs = client->read(); + idblog("received packet. conn index " << connIndex << ", other end " << client->otherEnd() << ", length " << sbs->length()); if (sbs->length() != 0) { // cout << "adding data to connIndex " << endl; @@ -335,6 +367,7 @@ void WEClients::Listen(boost::shared_ptr client, uint32_t co } cerr << "WEC got 0 byte message for object " << this << endl; + idblog("0 byte message"); goto Error; } } @@ -344,17 +377,20 @@ void WEClients::Listen(boost::shared_ptr client, uint32_t co catch (std::exception& e) { cerr << "WEC Caught EXCEPTION: " << e.what() << endl; + idblog("exception " << e.what()); goto Error; } catch (...) { cerr << "WEC Caught UNKNOWN EXCEPT" << endl; + idblog("unknown error "); goto Error; } Error: // error condition! push 0 length bs to messagequeuemap and // eventually let jobstep error out. + idblog("Error"); boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok; @@ -478,8 +514,9 @@ void WEClients::write(const messageqcpp::ByteStream& msg, uint32_t connection) fPmConnections[connection]->write(msg); else { + // new behavior: connection client is nullptr means it is read-only. ostringstream os; - os << "Lost connection to WriteEngineServer on pm" << connection; + os << "Connection to readonly pm" << connection; throw runtime_error(os.str()); } } diff --git a/writeengine/client/we_clients.h b/writeengine/client/we_clients.h index 8a512f7323..29abdfc5f1 100644 --- a/writeengine/client/we_clients.h +++ b/writeengine/client/we_clients.h @@ -114,6 +114,18 @@ class WEClients return pmCount; } + uint32_t getRWConnections() + { + uint32_t count = 0; + for (uint32_t i = 0; i < fPmConnections.size(); i++) + { + count += fPmConnections[i] != nullptr; + } + return count; + } + + bool isConnectionReadonly(uint32_t connection); + private: WEClients(const WEClients& weClient); WEClients& operator=(const WEClients& weClient); diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index fff5ece087..584458e27c 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -1478,8 +1478,13 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: } // MCOL-1495 Remove fCatalogMap entries CS won't use anymore. - CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); + idblog("just before systemCatalogPtr reset"); + systemCatalogPtr.reset(); // ??? XXX ??? + idblog("right after systemCatalogPtr reset"); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000); + idblog("right after removing local sessionId"); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); + idblog("right after removing sessionId"); return rc; } diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index 8f6eb600dd..58aab13d54 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -888,6 +888,8 @@ void ReadThreadFactory::CreateReadThread(ThreadPool& Tp, IOSocket& Ios, BRM::DBR aBs.peek(msgId); + idblog("msgId " << int(msgId)); + switch (msgId) { case WE_SVR_DDL_KEEPALIVE: diff --git a/writeengine/server/we_server.cpp b/writeengine/server/we_server.cpp index 1388516094..cdc425e159 100644 --- a/writeengine/server/we_server.cpp +++ b/writeengine/server/we_server.cpp @@ -278,6 +278,7 @@ int ServiceWriteEngine::Child() ThreadPool tp(mt, qs); cout << "WriteEngineServer is ready" << endl; + idblog("WriteEngineServer is ready (look for PID)"); NotifyServiceStarted(); BRM::DBRM dbrm;