관리-도구
편집 파일: cloudlinux_cli_user.py
# coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import json import logging import subprocess import os import sys from libcloudlinux import ( CloudlinuxCliBase, LVEMANAGER_PLUGIN_NAMES, DEFAULT_PLUGIN_NAME, PASSENGER_DEPEND_PLUGINS, AllLimitStrategy, NoLimitStrategy, LimitStrategyHeavy, NoCagefsStrategy, LimitStrategyBase, ConfigLimitValue, ) from clselector.clpassenger_detectlib import is_clpassenger_active from clcommon import ClPwd from clcommon.utils import is_litespeed_running from clcommon.lib.cledition import is_cl_solo_edition from cldetectlib import get_param_from_file from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported CONFIG = '/etc/sysconfig/cloudlinux' SMART_ADVICE_USER_CLI = '/opt/alt/php-xray/cl-smart-advice-user' PERCENTS_STATS_MODE_FLAG = '/opt/cloudlinux/flags/enabled-flags.d/percentage-user-stats-mode.flag' # NB: this logger's out is stderr, result JSON out is stdout - so with active logger web will not work properly # because of stderr redirection 2>&1 # so it is MUST be silent(NOTSET) in normal situation # also it is not possible to use file logger here - script works inside the cagefs with user's rights logger = logging.getLogger(__name__) logger.setLevel(logging.NOTSET) init_formatter = logging.Formatter('[%(asctime)s] %(funcName)s:%(lineno)s - %(message)s') cagefs_formatter = logging.Formatter('{cagefs} [%(asctime)s] %(funcName)s:%(lineno)s - %(message)s') h = logging.StreamHandler() h.setFormatter(init_formatter) logger.addHandler(h) logger.debug('cli start') class CloudlinuxCliUser(CloudlinuxCliBase): limit_strategy: LimitStrategyBase def __init__(self): self.web_resource_limit_mode = ConfigLimitValue.ALL limit_mode = get_param_from_file(CONFIG, 'web_resource_limit_mode', '=', ConfigLimitValue.ALL.value) self.web_resource_limit_mode = ConfigLimitValue(limit_mode) super(CloudlinuxCliUser, self).__init__() self.command_methods.update({ 'spa-get-domains': self.spa_user_domains, 'spa-get-homedir': self.spa_user_homedir, 'cloudlinux-snapshots': self.cl_snapshots, 'spa-get-user-info': self.spa_get_user_info }) def __init_limit_strategy(self): """ Set default strategy from the `CONFIG` values """ if self.skip_cagefs_check: logger.handlers[0].setFormatter(cagefs_formatter) # update log format to easier log review # we cannot use cagefs when it is not available if not is_panel_feature_supported(Feature.CAGEFS): self.limit_strategy = NoCagefsStrategy() else: self.limit_strategy = { ConfigLimitValue.ALL: AllLimitStrategy, ConfigLimitValue.HEAVY: LimitStrategyHeavy, ConfigLimitValue.UNLIMITED: NoLimitStrategy, }.get(self.web_resource_limit_mode, AllLimitStrategy)() logger.debug( f'Limits strategy inited as {self.limit_strategy.__class__}' f'\n\tBecause of:' f'\n\tself.web_resource_limit_mode: {self.web_resource_limit_mode}' ) def set_limit_strategy(self, strategy: LimitStrategyBase): logger.debug(f'Limit strategy is explicitly set to {strategy.__class__}') self.limit_strategy = strategy def __check_exclusive_commands(self): """ Check is command currently run without cagefs; commands list is taken from Spa.php `processRequest()` This function should be removed in the same task as the `LimitStrategyNoCagefs` """ data = self.request_data if data.get('params', {}).get('interpreter') == 'php' or data.get('command') in { 'cloudlinux-statistics', 'cloudlinux-quota', 'cloudlinux-top', 'cloudlinux-snapshots', 'cloudlinux-charts', 'cloudlinux-statsnotifier', 'spa-get-user-info', 'cloudlinux-awp-user', 'cloudlinux-xray-user-manager', 'spa-get-domains', 'cpanel-api', 'cl-smart-advice-user', }: logger.debug('Executable command found in the exclusive list') self.set_limit_strategy(NoCagefsStrategy()) def drop_permission(self): """ Drop permission to users, if owner of script is user :return: """ logger.debug( 'drop permissions start' f'\n\targv is: {sys.argv}' f'\n\trequest data is: {self.request_data}' ) self.__init_limit_strategy() data = self.request_data if data['owner'] != 'user': self.exit_with_error("User not allowed") super(CloudlinuxCliUser, self).drop_permission() args = self.prepair_params_for_command() logger.debug(f'prepared args is: {args}') if data.get('command'): if self.skip_cagefs_check: logger.debug('cagefs skipped: --skip-cagefs-check arg found') else: self.__check_exclusive_commands() # if rc is None - script won't enter the cagefs # otherwise - command is executed in the cagefs rc = self.limit_strategy.execute(data['command'], args, self.request_data) if rc is not None: logger.debug(f'command executed inside of the cagefs with rc: {rc}') sys.exit(rc) else: logger.debug(f'cagefs skipped: strategy is {self.limit_strategy.__class__}') # skip checking plugin availability on spa-get-user-info if data.get('command') != 'spa-get-user-info': self.check_plugin_availability() logger.debug('drop permissons end') def spa_user_domains(self): print(json.dumps({"result": "success", "list": self.get_user_domains()})) sys.exit(0) def spa_user_homedir(self): print(json.dumps({"result": "success", "homedir": self.get_user_homedir()})) sys.exit(0) def spa_get_user_info(self): try: print(json.dumps( { "result": "success", "domains": self.get_user_domains(), "homedir": self.get_user_homedir(), "is_litespeed_running": is_litespeed_running(), "is_cl_solo_edition": is_cl_solo_edition(skip_jwt_check=True), "smart_advice": os.path.isfile(SMART_ADVICE_USER_CLI), "is_lve_supported": is_panel_feature_supported(Feature.LVE), "user_stats_mode": self.get_stats_mode(), "server_ip": self.get_server_ip() })) except: self.exit_with_error('Module unavailable') sys.exit(0) def get_user_domains(self): try: from clcommon.cpapi import userdomains except: self.exit_with_error('Module unavailable') return [x[0] for x in userdomains(self.user_info['username'])] def get_stats_mode(self): if os.path.isfile(PERCENTS_STATS_MODE_FLAG): return 'percent' return 'default' def get_user_homedir(self): try: pwdir = ClPwd().get_homedir(self.user_info['username']) return pwdir + "/" except KeyError: self.exit_with_error('No such user') def cl_snapshots(self): list_to_request = self.prepair_params_for_command() try: output = self.run_util('/usr/sbin/lve-read-snapshot', *list_to_request) except subprocess.CalledProcessError as processError: output = processError.output try: result = json.loads(output) except: self.exit_with_error(output) return self.exit_with_success({'data': result['data']}) sys.exit(0) def check_plugin_availability(self): plugin_names = { 'nodejs_selector': 'Node.js Selector', 'python_selector': 'Python Selector', } selector_enabled = True manager = None try: if self.current_plugin_name == 'nodejs_selector': from clselect.clselectnodejs.node_manager import NodeManager manager = NodeManager() if self.current_plugin_name == 'python_selector': from clselect.clselectpython.python_manager import PythonManager manager = PythonManager() if manager: selector_enabled = manager.selector_enabled except: selector_enabled = False if not selector_enabled: self.exit_with_error( code=503, error_id='ERROR.not_available_plugin', context={'pluginName': plugin_names.get(self.current_plugin_name, 'Plugin')}, icon='disabled') plugin_available_checker = { 'nodejs_selector': self._plugin_available_nodejs, 'python_selector': self._plugin_available_python, 'php_selector': self._plugin_available_php, 'resource_usage': self._plugin_available_resource_usage, }.get(self.current_plugin_name) if plugin_available_checker: plugin_available = plugin_available_checker() else: plugin_available = True if not is_clpassenger_active() and self.current_plugin_name in PASSENGER_DEPEND_PLUGINS: self.exit_with_error( code=503, error_id='ERROR.not_available_passenger', context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)}, icon='disabled') if not plugin_available: self.exit_with_error( code=503, error_id='ERROR.not_available_plugin', context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)}, icon='disabled') def _plugin_available_nodejs(self): try: from clselect.clselectnodejs.node_manager import NodeManager manager = NodeManager() if not manager.selector_enabled or not is_clpassenger_active(): return False except: return False return True def _plugin_available_python(self): try: from clselect.clselectpython.python_manager import PythonManager manager = PythonManager() if not manager.selector_enabled or not is_clpassenger_active(): return False except: return False return True def _plugin_available_php(self): try: from clselect.clselectphp.php_manager import PhpManager manager = PhpManager() if not manager.selector_enabled: return False except: return False return True def _plugin_available_resource_usage(self): return True