Server IP : 184.154.167.98 / Your IP : 13.59.218.229 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/share/cagefs/ |
Upload File : |
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import absolute_import from __future__ import division from __future__ import unicode_literals from future import standard_library standard_library.install_aliases() from builtins import * import os import pwd import grp import glob import subprocess import re from clcommon import clconfpars from itertools import chain from collections import namedtuple from cagefsctl import ( MountpointConfig, DISABLE_ETCFS, get_cagefs_users, enabled_dir, disabled_dir, build_wrappers_dicts, ) from cagefslib import CageFSException from clcagefslib.selector.configure import is_ea4_enabled, read_cpanel_ea4_php_conf cldiaglib_found = True try: from cldiaglib import ( runner, ChkResult, OK, FAILED, SKIPPED, ) except ImportError: cldiaglib_found = False # Possible result types OK = 'OK' FAILED = 'FAILED' SKIPPED = 'SKIPPED' INTERNAL_TEST_ERROR = 'INTERNAL_TEST_ERROR' ChkResult = namedtuple('ChkResult', [ 'res', # One of predefined checker result types 'msg', # Resulting msg from this checker ]) def check_cagefs_mount_points_exists(): # We don't check personal mounts because they could be virtual # and don't use read_only_mounts list because it's already in "mounts" mp_config = MountpointConfig(skip_errors=True, ignore_cache=True) missing = [] for p in chain(mp_config.common_mounts, mp_config.splitted_by_username_mounts, mp_config.splitted_by_uid_mounts): t = p.strip() if not os.path.isdir(t): missing.append(t) if missing: return ChkResult(FAILED, 'There are missing mount points: {}'.format(missing)) return ChkResult(OK, 'No missing mount points found') check_cagefs_mount_points_exists.pretty_name = 'Check cagefs mount points exists' def check_cagefs_enabled_users_isdir(): if os.path.exists(enabled_dir) and not os.path.isdir(enabled_dir): return ChkResult(FAILED, "{} is not a directory".format(enabled_dir)) return ChkResult(OK, '{} is fine'.format(enabled_dir)) check_cagefs_enabled_users_isdir.pretty_name = 'Check cagefs users.enabled is directory' def check_cagefs_disabled_users_isdir(): if os.path.exists(disabled_dir) and not os.path.isdir(disabled_dir): return ChkResult(FAILED, "{} is not a directory".format(disabled_dir)) return ChkResult(OK, '{} is fine'.format(disabled_dir)) check_cagefs_disabled_users_isdir.pretty_name = 'Check cagefs users.disabled is directory' def check_cagefs_disabled_etcfs_exists(): if not os.path.exists(DISABLE_ETCFS): return ChkResult(FAILED, "{} doesn't exists".format(DISABLE_ETCFS)) return ChkResult(OK, '{} exists'.format(DISABLE_ETCFS)) check_cagefs_disabled_etcfs_exists.pretty_name = 'Check cagefs disable.etcfs exists' def get_cagefs_user_for_test(groups, all_enabled_users): """ Filter out users that are in super groups and return username and uid of cagefs user for test :param groups: list of super groups :type groups: list of str :param all_enabled_users: list of cagefs users to filter :type all_enabled_users: list of str :rtype tuple (user, uid) or (None, None) when user not found """ super_gids = set() # set of gids of super groups from pam_lve config super_uids = set() # set of uids of members of super groups re_pattern = re.compile('^cldiaguser_[a-f0-9]{21}$') for group in groups: try: g = grp.getgrnam(group) except KeyError: continue super_gids.add(g.gr_gid) for user in g.gr_mem: try: p = pwd.getpwnam(user) except KeyError: continue super_uids.add(p.pw_uid) for user in all_enabled_users: # LU-1893: skip cldiag test user if re_pattern.match(user): continue try: p = pwd.getpwnam(user) except KeyError: continue uid = p.pw_uid gid = p.pw_gid if gid in super_gids or uid in super_uids: continue return user, uid return None, None def check_users_can_enter_cagefs(): try: all_enabled_users = get_cagefs_users(raise_exception=True) except CageFSException: return ChkResult(SKIPPED, 'No users with cagefs enabled') if not all_enabled_users: return ChkResult(SKIPPED, 'No users with cagefs enabled') try: cfg = clconfpars.parse_pam_lve_config('/etc/pam.d/su') except (IOError, ValueError) as e: return ChkResult(FAILED, 'Error parsing /etc/pam.d/su config file {}'.format(e)) if cfg is None: return ChkResult(FAILED, 'pam_lve configuration is not found in /etc/pam.d/su config file') user, uid = get_cagefs_user_for_test(cfg.groups, all_enabled_users) if user is None: return ChkResult(SKIPPED, 'No users with cagefs enabled (all enabled users are in super group)') inner = ('echo -n "Logged in as: $(whoami) - $(id -u) "; ' '[ "$(id -u)" == "{0}" ] && ls /var/.cagefs').format(uid) cmd = """unset BASH_ENV; su '{0}' -s /bin/bash -c '{1}'""".format(user, inner) try: subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True, executable='/bin/bash', text=True).strip() except subprocess.CalledProcessError as e: return ChkResult(FAILED, '{}; Output was: "{}"'.format(e, e.output.strip())) return ChkResult(OK, 'Several tested users really can enter cagefs') check_users_can_enter_cagefs.pretty_name = 'Check cagefs users can enter cagefs' def check_proxy_commands_configs_are_parsable(): try: # This will load all "*.proxy.commands" under the hood. # Currently we just try to parse them and expect that some exception # will be raised if syntax is invalid or files can't be loaded for # any reason. # More strict validation should be implemented in cagefsctl itself build_wrappers_dicts(raise_exception=True) except Exception as e: return ChkResult( FAILED, 'Proxy commands config parsing error: "{}"'.format(repr(e)) ) return ChkResult(OK, 'Syntax looks fine. Files are parsable') check_proxy_commands_configs_are_parsable.pretty_name = 'Check cagefs proxy commands configs are parsable' def check_all_virt_mp_files_syntax(): wrong = [] files = glob.glob('/var/cagefs/*/*/virt.mp') if not files: return ChkResult(SKIPPED, 'No virt.mp files found') for virt_mp in files: with open(virt_mp, 'rt') as f: conf = f.read() # virt.mp files shouldn't be empty if exists if len(conf) == 0: wrong.append(virt_mp) continue # files shouldn't start with sub-directory definitions, # at least one parent should be first, so: if conf[0] == '@': wrong.append(virt_mp) if wrong: return ChkResult(FAILED, wrong) return ChkResult(OK, 'virt.mp files syntax is fine') check_all_virt_mp_files_syntax.pretty_name = 'Check cagefs virt.mp files syntax' def check_multiphp_system_default(): def php_selector_is_disabled(): try: f = open('/var/cpanel/cpanel.config', 'r') result = 'lve_hide_selector=1\n' in f f.close() except IOError: return False return result if is_ea4_enabled() and not php_selector_is_disabled(): conf = read_cpanel_ea4_php_conf() if conf: try: # get default system php version selected via MultiPHP Manager in cPanel WHM default_php = conf['default'] # LVEMAN-1170 if not default_php.startswith('ea-php'): return ChkResult(FAILED, 'Choose one of ea-php versions instead of alt-php in cPanel MultiPHP Manager for PHP Selector to start working.') except KeyError: return ChkResult(FAILED, 'Cannot get MultiPHP system default version') return ChkResult(OK, 'MultiPHP system default PHP version is NOT alt-php. PHP Selector should work normally.') check_multiphp_system_default.pretty_name = 'Check MultiPHP system default php version' CAGEFS_CHECKERS = ( check_cagefs_mount_points_exists, check_cagefs_enabled_users_isdir, check_cagefs_disabled_users_isdir, check_cagefs_disabled_etcfs_exists, check_users_can_enter_cagefs, check_proxy_commands_configs_are_parsable, check_all_virt_mp_files_syntax, check_multiphp_system_default, ) def run_tests(): errors = [] output = [] for f in CAGEFS_CHECKERS: try: chk_res = f() res, details = chk_res.res, chk_res.msg except Exception as e: res, details = INTERNAL_TEST_ERROR, repr(e) if res == OK: output.append("{}...\n{}\n".format(f.__name__, res)) else: if res not in (SKIPPED,): errors.append(res) output.append("{}...\n{}: {}\n".format(f.__name__, res, details)) return errors, output def check(): if os.geteuid() != 0: print('This script should be run by root user') exit(1) if cldiaglib_found: runner(CAGEFS_CHECKERS) else: print('*** Starting sanity check ***\n') errors, out = run_tests() print('\n'.join(out)) print('*** There are {} errors ***'.format(len(errors))) if errors: exit(2)