Skip to content

MCOL-5861 cmapi read only nodes #3432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: stable-23.10
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions cmapi/cmapi_server/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import os
from typing import NamedTuple
from enum import Enum


# default MARIADB ColumnStore config path
Expand Down Expand Up @@ -53,6 +54,16 @@
CMAPI_INSTALL_PATH, 'cmapi_server/SingleNode.xml'
)

class MCSProgs(Enum):
STORAGE_MANAGER = 'StorageManager'
WORKER_NODE = 'workernode'
CONTROLLER_NODE = 'controllernode'
PRIM_PROC = 'PrimProc'
EXE_MGR = 'ExeMgr'
WRITE_ENGINE_SERVER = 'WriteEngineServer'
DML_PROC = 'DMLProc'
DDL_PROC = 'DDLProc'

# constants for dispatchers
class ProgInfo(NamedTuple):
"""NamedTuple for some additional info about handling mcs processes."""
Expand All @@ -66,17 +77,17 @@ class ProgInfo(NamedTuple):
# on top level of process handling
# mcs-storagemanager starts conditionally inside mcs-loadbrm, but should be
# stopped using cmapi
ALL_MCS_PROGS = {
ALL_MCS_PROGS: dict[MCSProgs, ProgInfo] = {
# workernode starts on primary and non primary node with 1 or 2 added
# to subcommand (DBRM_Worker1 - on primary, DBRM_Worker2 - non primary)
'StorageManager': ProgInfo(15, 'mcs-storagemanager', '', False, 1),
'workernode': ProgInfo(13, 'mcs-workernode', 'DBRM_Worker{}', False, 1),
'controllernode': ProgInfo(11, 'mcs-controllernode', 'fg', True),
'PrimProc': ProgInfo(5, 'mcs-primproc', '', False, 1),
'ExeMgr': ProgInfo(9, 'mcs-exemgr', '', False, 1),
'WriteEngineServer': ProgInfo(7, 'mcs-writeengineserver', '', False, 3),
'DMLProc': ProgInfo(3, 'mcs-dmlproc', '', False),
'DDLProc': ProgInfo(1, 'mcs-ddlproc', '', False),
MCSProgs.STORAGE_MANAGER: ProgInfo(15, 'mcs-storagemanager', '', False, 1),
MCSProgs.WORKER_NODE: ProgInfo(13, 'mcs-workernode', 'DBRM_Worker{}', False, 1),
MCSProgs.CONTROLLER_NODE: ProgInfo(11, 'mcs-controllernode', 'fg', True),
MCSProgs.PRIM_PROC: ProgInfo(5, 'mcs-primproc', '', False, 1),
MCSProgs.EXE_MGR: ProgInfo(9, 'mcs-exemgr', '', False, 1),
MCSProgs.WRITE_ENGINE_SERVER: ProgInfo(7, 'mcs-writeengineserver', '', False, 3),
MCSProgs.DML_PROC: ProgInfo(3, 'mcs-dmlproc', '', False),
MCSProgs.DDL_PROC: ProgInfo(1, 'mcs-ddlproc', '', False),
}

# constants for docker container dispatcher
Expand Down
1 change: 1 addition & 0 deletions cmapi/cmapi_server/controllers/api_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def add_node(
:param node_info: Information about the node to add.
:return: The response from the API.
"""
#TODO: fix interface as in remove_node used or think about universal
return self._request('PUT', 'node', {**node_info, **extra})

def remove_node(
Expand Down
13 changes: 8 additions & 5 deletions cmapi/cmapi_server/controllers/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def put_config(self):
MCSProcessManager.stop_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
timeout=request_timeout
timeout=request_timeout,
)
except CMAPIBasicError as err:
raise_422_error(
Expand Down Expand Up @@ -463,6 +463,7 @@ def put_config(self):
MCSProcessManager.start_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
is_read_only=node_config.is_read_only(),
)
except CMAPIBasicError as err:
raise_422_error(
Expand Down Expand Up @@ -666,7 +667,8 @@ def put_start(self):
try:
MCSProcessManager.start_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo
use_sudo=use_sudo,
is_read_only=node_config.is_read_only(),
)
except CMAPIBasicError as err:
raise_422_error(
Expand Down Expand Up @@ -701,7 +703,7 @@ def put_shutdown(self):
MCSProcessManager.stop_node(
is_primary=node_config.is_primary_node(),
use_sudo=use_sudo,
timeout=timeout
timeout=timeout,
)
except CMAPIBasicError as err:
raise_422_error(
Expand Down Expand Up @@ -910,16 +912,17 @@ def put_add_node(self):
node = request_body.get('node', None)
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
in_transaction = request_body.get('in_transaction', False)
read_only = request_body.get('read_only', False)

if node is None:
raise_422_error(module_logger, func_name, 'missing node argument')

try:
if not in_transaction:
with TransactionManager(extra_nodes=[node]):
response = ClusterHandler.add_node(node, config)
response = ClusterHandler.add_node(node, config, read_only)
else:
response = ClusterHandler.add_node(node, config)
response = ClusterHandler.add_node(node, config, read_only)
except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message)

Expand Down
28 changes: 20 additions & 8 deletions cmapi/cmapi_server/handlers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
get_current_key, get_version, update_revision_and_manager,
)
from cmapi_server.node_manipulation import (
add_node, add_dbroot, remove_node, switch_node_maintenance,
add_node, add_dbroot, remove_node, switch_node_maintenance, update_dbroots_of_readonly_nodes,
)
from mcs_node_control.models.misc import get_dbrm_master
from mcs_node_control.models.node_config import NodeConfig
Expand Down Expand Up @@ -139,14 +139,19 @@ def shutdown(
return {'timestamp': operation_start_time}

@staticmethod
def add_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict:
def add_node(
node: str, config: str = DEFAULT_MCS_CONF_PATH,
read_only: bool = False,
) -> dict:
"""Method to add node to MCS CLuster.

:param node: node IP or name or FQDN
:type node: str
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param read_only: add node in read-only mode, defaults to False
:type read_only: bool, optional
:raises CMAPIBasicError: on exception while starting transaction
:raises CMAPIBasicError: if transaction start isn't successful
:raises CMAPIBasicError: on exception while adding node
Expand All @@ -157,20 +162,25 @@ def add_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict:
:rtype: dict
"""
logger: logging.Logger = logging.getLogger('cmapi_server')
logger.debug(f'Cluster add node command called. Adding node {node}.')
logger.debug(
f'Cluster add node command called. Adding node {node} in '
f'{"read-only" if read_only else "read-write"} mode.'
)

response = {'timestamp': str(datetime.now())}

try:
add_node(
node, input_config_filename=config,
output_config_filename=config
output_config_filename=config,
read_only=read_only,
)
if not get_dbroots(node, config):
add_dbroot(
host=node, input_config_filename=config,
output_config_filename=config
)
if not read_only: # Read-only nodes don't own dbroots
add_dbroot(
host=node, input_config_filename=config,
output_config_filename=config
)
except Exception as err:
raise CMAPIBasicError('Error while adding node.') from err

Expand Down Expand Up @@ -213,6 +223,8 @@ def remove_node(node: str, config: str = DEFAULT_MCS_CONF_PATH) -> dict:
node, input_config_filename=config,
output_config_filename=config
)
with NodeConfig().modify_config(config) as root:
update_dbroots_of_readonly_nodes(root)
except Exception as err:
raise CMAPIBasicError('Error while removing node.') from err

Expand Down
5 changes: 3 additions & 2 deletions cmapi/cmapi_server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import socket
import time
from collections import namedtuple
from functools import partial
from random import random
from shutil import copyfile
from typing import Tuple, Optional
Expand Down Expand Up @@ -379,7 +378,7 @@ async def update_config(node: str, headers: dict, body: dict) -> None:
) as response:
resp_json = await response.json(encoding='utf-8')
response.raise_for_status()
logging.info(f'Node {node} config put successfull.')
logging.info(f'Node {node} config put successful.')
except aiohttp.ClientResponseError as err:
# TODO: may be better to check if resp status is 422 cause
# it's like a signal that cmapi server raised it in
Expand Down Expand Up @@ -577,6 +576,7 @@ def get_dbroots(node, config=DEFAULT_MCS_CONF_PATH):
dbroots = []
smc_node = root.find('./SystemModuleConfig')
mod_count = int(smc_node.find('./ModuleCount3').text)

for i in range(1, mod_count+1):
ip_addr = smc_node.find(f'./ModuleIPAddr{i}-1-3').text
hostname = smc_node.find(f'./ModuleHostName{i}-1-3').text
Expand All @@ -596,6 +596,7 @@ def get_dbroots(node, config=DEFAULT_MCS_CONF_PATH):
dbroots.append(
smc_node.find(f"./ModuleDBRootID{i}-{j}-3").text
)

return dbroots


Expand Down
3 changes: 2 additions & 1 deletion cmapi/cmapi_server/logging_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def enable_console_logging(logger: logging.Logger) -> None:
def config_cmapi_server_logging():
# add custom level TRACE only for develop purposes
# could be activated using API endpoints or cli tool without relaunching
add_logging_level('TRACE', 5)
if not hasattr(logging, 'TRACE'):
add_logging_level('TRACE', 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI I think to remove or rewrite this dev function.

cherrypy._cplogging.LogManager.error = custom_cherrypy_error
# reconfigure cherrypy.access log message format
# Default access_log_format '{h} {l} {u} {t} "{r}" {s} {b} "{f}" "{a}"'
Expand Down
52 changes: 36 additions & 16 deletions cmapi/cmapi_server/managers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import psutil

from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.constants import MCS_INSTALL_BIN, ALL_MCS_PROGS
from cmapi_server.constants import MCS_INSTALL_BIN, ALL_MCS_PROGS, MCSProgs, ProgInfo
from cmapi_server.process_dispatchers.base import BaseDispatcher
from cmapi_server.process_dispatchers.systemd import SystemdDispatcher
from cmapi_server.process_dispatchers.container import (
ContainerDispatcher
Expand All @@ -18,7 +19,7 @@
from mcs_node_control.models.process import Process


PROCESS_DISPATCHERS = {
PROCESS_DISPATCHERS: dict[str, type[BaseDispatcher]] = {
'systemd': SystemdDispatcher,
# could be used in docker containers and OSes w/o systemd
'container': ContainerDispatcher,
Expand All @@ -32,10 +33,10 @@ class MCSProcessManager:
e.g. re/-start or stop systemd services, run executable.
"""
CONTROLLER_MAX_RETRY = 30
mcs_progs = {}
mcs_progs: dict[str, ProgInfo] = {}
mcs_version_info = None
dispatcher_name = None
process_dispatcher = None
process_dispatcher: BaseDispatcher = None

@classmethod
def _get_prog_name(cls, name: str) -> str:
Expand All @@ -47,12 +48,13 @@ def _get_prog_name(cls, name: str) -> str:
:rtype: str
"""
if cls.dispatcher_name == 'systemd':
return ALL_MCS_PROGS[name].service_name
prog = MCSProgs(name)
return ALL_MCS_PROGS[prog].service_name
return name

@classmethod
def _get_sorted_progs(
cls, is_primary: bool, reverse: bool = False
cls, is_primary: bool, reverse: bool = False, is_read_only: bool = False
) -> dict:
"""Get sorted services dict.

Expand All @@ -72,6 +74,13 @@ def _get_sorted_progs(
for prog_name, prog_info in cls.mcs_progs.items()
if prog_name not in PRIMARY_PROGS
}

if is_read_only:
logging.debug('Node is in read-only mode, skipping WriteEngine')
unsorted_progs.pop(
MCSProgs.WRITE_ENGINE_SERVER.value, None
)

if reverse:
# stop sequence builds using stop_priority property
return dict(
Expand All @@ -89,7 +98,8 @@ def _detect_processes(cls) -> None:
if cls.mcs_progs:
logging.warning('Mcs ProcessHandler already detected processes.')

for prog_name, prog_info in ALL_MCS_PROGS.items():
for prog, prog_info in ALL_MCS_PROGS.items():
prog_name = prog.value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI It's what I mentioned before here about naming and access names. prog.value == prog_name. It's somehow confusing.

if os.path.exists(os.path.join(MCS_INSTALL_BIN, prog_name)):
cls.mcs_progs[prog_name] = prog_info

Expand Down Expand Up @@ -404,37 +414,47 @@ def is_node_processes_ok(
return set(node_progs) == set(p['name'] for p in running_procs)

@classmethod
def start_node(cls, is_primary: bool, use_sudo: bool = True):
def start_node(
cls,
is_primary: bool,
use_sudo: bool = True,
is_read_only: bool = False,
) -> None:
"""Start mcs node processes.

:param is_primary: is node primary or not, defaults to True
:type is_primary: bool
:param use_sudo: use sudo or not, defaults to True
:type use_sudo: bool, optional
:param is_read_only: if true, doesn't start WriteEngine
:type is_read_only: bool, optional
:raises CMAPIBasicError: immediately if one mcs process not started
"""
for prog_name in cls._get_sorted_progs(is_primary):
for prog_name in cls._get_sorted_progs(is_primary=is_primary, is_read_only=is_read_only):
if (
cls.dispatcher_name == 'systemd'
and prog_name == 'StorageManager'
and prog_name == MCSProgs.STORAGE_MANAGER.value
):
# TODO: MCOL-5458
logging.info(
f'Skip starting {prog_name} with systemd dispatcher.'
)
continue
# TODO: additional error handling
if prog_name == 'controllernode':
if prog_name == MCSProgs.CONTROLLER_NODE.value:
cls._wait_for_workernodes()
if prog_name in ('DMLProc', 'DDLProc'):
if prog_name in (MCSProgs.DML_PROC.value, MCSProgs.DDL_PROC.value):
cls._wait_for_controllernode()
if not cls.start(prog_name, is_primary, use_sudo):
logging.error(f'Process "{prog_name}" not started properly.')
raise CMAPIBasicError(f'Error while starting "{prog_name}".')

@classmethod
def stop_node(
cls, is_primary: bool, use_sudo: bool = True, timeout: int = 10
cls,
is_primary: bool,
use_sudo: bool = True,
timeout: int = 10,
):
"""Stop mcs node processes.

Expand All @@ -450,14 +470,14 @@ def stop_node(
# so use full available list of processes. Otherwise, it could cause
# undefined behaviour when primary gone and then recovers (failover
# triggered 2 times).
for prog_name in cls._get_sorted_progs(True, reverse=True):
for prog_name in cls._get_sorted_progs(is_primary=True, reverse=True):
if not cls.stop(prog_name, is_primary, use_sudo):
logging.error(f'Process "{prog_name}" not stopped properly.')
raise CMAPIBasicError(f'Error while stopping "{prog_name}"')

@classmethod
def restart_node(cls, is_primary: bool, use_sudo: bool):
def restart_node(cls, is_primary: bool, use_sudo: bool, is_read_only: bool = False):
"""TODO: For next releases."""
if cls.get_running_mcs_procs():
cls.stop_node(is_primary, use_sudo)
cls.start_node(is_primary, use_sudo)
cls.start_node(is_primary, use_sudo, is_read_only)
4 changes: 2 additions & 2 deletions cmapi/cmapi_server/managers/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ def rollback_transaction(self, nodes: Optional[list] = None) -> None:
try:
rollback_transaction(self.txn_id, nodes=nodes)
self.active_transaction = False
logging.debug(f'Success rollback of transaction "{self.txn_id}".')
logging.debug(f'Successful rollback of transaction "{self.txn_id}".')
except Exception:
logging.error(
f'Error while rollback transaction "{self.txn_id}"',
f'Error while rolling back transaction "{self.txn_id}"',
exc_info=True
)

Expand Down
Loading