Server IP : 184.154.167.98 / Your IP : 3.129.20.62 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 8.2.26 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/sbin/ |
Upload File : |
#!/opt/imunify360/venv/bin/python3 import glob import ipaddress import os import random import re import shutil import string import subprocess import sys class Panel: def __init__(self, host_addresses): self.host_addresses = set() if host_addresses: self.host_addresses.update(host_addresses) @classmethod def detect(cls): if not cls.detect_path: return False return os.path.exists(cls.detect_path) def get_destinations(self): data = {} for ip, domains in self.additional.items(): for domain in domains: data[domain] = ip return data def get_ssl_mapping(self): data = {} for ip, domains in self.additional.items(): data[ip] = sorted(domains)[0] if self.main_ip and self.first_domain: data[self.main_ip] = self.first_domain return data class cPanel(Panel): detect_path = '/usr/local/cpanel/cpanel' domain_ips_path = '/etc/domainips' main_address_path = '/var/cpanel/mainip' domain_users_path = '/etc/domainusers' def prepare(self): all_domains = self._get_domains() self.main_ip = self._get_main_ip() self.additional = self._get_additional() if not all_domains: self.first_domain = None return diff = sorted( all_domains.difference( *self.additional.values())) self.first_domain = diff[0] if diff else None @classmethod def _get_domains(cls): domains = set() try: with open(cls.domain_users_path) as f: for line in f: _, domain = [i.strip() for i in line.split(':')] domains.add(domain) except Exception: pass return domains def _get_additional(self): addresses = {} if not os.path.exists(self.domain_ips_path): return addresses try: with open(self.domain_ips_path) as f: for line in f: if line.startswith('#'): continue if ':' not in line: continue ip, domain = [i.strip() for i in line.split(':', 1)] if not ip: continue if self.host_addresses and ip not in self.host_addresses: continue if ip not in addresses: addresses[ip] = [] addresses[ip].append(domain) except Exception: pass return addresses @classmethod def _get_main_ip(cls): try: with open(cls.main_address_path) as f: return f.read().strip() except Exception: return class Plesk(Panel): detect_path = '/usr/sbin/plesk' passwd_path = '/etc/psa/.psa.shadow' @classmethod def _get_plesk_passwd(cls): try: with open(cls.passwd_path) as f: return f.read() except Exception: return @classmethod def _find_main_ip(cls, ip_domains): """ Attempt to detect main IP address """ # Find 'shared' ip with Plesk utility ip_info = cls._get_ip_info() if ip_info: for ip in ip_domains.keys(): if ip_info.get(ip): # Current IP maps to True return ip # OK. Found it. Exit # Let the main ip be IP with max number of domains max_count = 0 max_ip = None for ip, domains in ip_domains.items(): count = len(domains) if count > max_count: max_count = count max_ip = ip return max_ip def _get_domain_ips(self): data = {} passwd = self._get_plesk_passwd() if not passwd: return data env = {'MYSQL_PWD': passwd} query = ("""SELECT dom.name, iad.ip_address FROM domains """ """dom LEFT JOIN DomainServices d ON (dom.id = d.dom_id """ """AND d.type = 'web') LEFT JOIN IpAddressesCollections ia """ """ON ia.ipCollectionId = d.ipCollectionId LEFT JOIN """ """IP_Addresses iad ON iad.id = ia.ipAddressId""") cmd = ['mysql', '-u', 'admin', '-Dpsa', '-e', query] try: p = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, env=env) out, err = p.communicate() except (subprocess.CalledProcessError, FileNotFoundError): return data if not out: return data for line in out.splitlines(): domain, ip = line.split() if not ip: continue if self.host_addresses and ip not in self.host_addresses: continue if ip not in data: data[ip] = [] data[ip].append(domain) return data @classmethod def _get_ip_info(cls): data = {} patt = re.compile( r"""[^:]+: # Interface name, e.g 'eth0:' (?P<ip>[^/]+) # all symbols before forward slash """, re.VERBOSE) cmd = ['plesk', 'bin', 'ipmanage', '--ip_list'] try: out = subprocess.check_output( cmd, stderr=subprocess.DEVNULL, universal_newlines=True) except subprocess.CalledProcessError: return data if not out: return data for line in out.splitlines(): splitted = line.split() if not splitted: continue if len(splitted) < 3: continue match = patt.match(splitted[2]) # third field contains IP if match and splitted[1] in ('S', 'E'): data[match.group('ip')] = True if splitted[1] == 'S' else False return data def prepare(self): all_domains = self._get_domain_ips() self.main_ip = self._find_main_ip(all_domains) if not all_domains or not self.main_ip: self.first_domain = None self.additional = {} return shared_ips = sorted(all_domains.get(self.main_ip, tuple())) self.first_domain = shared_ips[0] if shared_ips else None self.additional = {k: v for k, v in all_domains.items() if k != self.main_ip} class DirectAdmin(Panel): detect_path = '/usr/local/directadmin/custombuild/build' glob_path = '/usr/local/directadmin/data/users/*/domains/*.conf' @classmethod def _get_paths(cls): paths = [] for path in glob.iglob(cls.glob_path): paths.append(path) return paths @staticmethod def _read_config(path): domain, ip = None, None domain_found, ip_found = False, False with open(path) as f: for line in f: if domain_found and ip_found: break if line.startswith('#'): continue if '=' not in line: continue key, value = [i.strip() for i in line.split('=', 1)] if key == 'domain': domain = value domain_found = True elif key == 'ip': ip = value ip_found = True else: continue return ip, domain @staticmethod def _find_main_ip(ip_map): """ The IP address with maximum domains is the main one """ max_count = 0 max_ip = None for ip, domains in ip_map.items(): count = len(domains) if count > max_count: max_count = count max_ip = ip return max_ip @classmethod def _get_domains(cls): domains = {} for path in cls._get_paths(): try: ip, domain = cls._read_config(path) except UnicodeDecodeError: continue if ip and domain: if ip not in domains: domains[ip] = [] domains[ip].append(domain) return domains def prepare(self): all_domains = self._get_domains() self.main_ip = self._find_main_ip(all_domains) if not all_domains or not self.main_ip: self.first_domain = None self.additional = {} return shared_ips = sorted(all_domains.get(self.main_ip, tuple())) self.first_domain = shared_ips[0] if shared_ips else None self.additional = {k: v for k, v in all_domains.items() if k != self.main_ip} class AddressHandler: map_conf = '/etc/imunify360-webshield/backend-destinations.conf' map_file = '/etc/imunify360-webshield/default-destinations.dat' @staticmethod def _generate(length=8): sample = string.ascii_letters + string.digits return ''.join(random.sample(sample, length)) @staticmethod def _get_panel(): for panel in cPanel, Plesk, DirectAdmin: if panel.detect(): return panel @staticmethod def _get_ip_addresses(): addresses = set() patt = re.compile( r"""(?:\d+:\s?)? # number (e.g. '1:') and optional space (?P<if>\S+) # interface name (e.g. 'eth0') \s+? # space(s) inet6?\s # word 'inet' or 'inet6' (?P<ip>(?: # start IP capturing \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3} # for IPv4 | # or [0-9a-fA-F:]+) # for IPv6 ) # end capturing (?:/(?P<mask>\d{1,3}))? # capture mask (e.g.'/24'), if any """, re.VERBOSE) p = subprocess.Popen(['ip', '-o', 'address', 'show'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) out, err = p.communicate() if not out: return for line in out.splitlines(): m = patt.match(line) if not m: continue iface, ip, mask = m.group('if', 'ip', 'mask') if iface == 'lo': continue if ':' in ip and ipaddress.IPv6Address(ip).is_link_local: continue addresses.add(ip) return addresses @classmethod def _save(cls, data, path, semicolon=True): if not data: return fmt = "{} {};\n" if semicolon else "{} {}\n" tmp_path = '.'.join([path, cls._generate()]) with open(tmp_path, 'w') as f: for key, val in data.items(): f.write(fmt.format(key, val)) shutil.move(tmp_path, path) @classmethod def run(cls): addresses = cls._get_ip_addresses() if len(addresses) < 2: sys.stderr.write('No additional IP addresses found\n') return panel = cls._get_panel() if panel is None: sys.stderr.write('We cannot use unknown hosting panels. Skip\n') return p = panel(addresses) p.prepare() cls._save(p.get_ssl_mapping(), cls.map_file, False) if __name__ == '__main__': AddressHandler.run()