Server IP : 184.154.167.98 / Your IP : 18.118.0.93 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 8.2.26 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/sbin/ |
Upload File : |
#!/opt/imunify360/venv/bin/python3 import argparse import json import logging import logging.handlers import os.path import shutil import socket import subprocess import sys import time from pathlib import Path from typing import Optional from defence360agent import sentry from defence360agent.contracts.config import GENERIC_SENSOR_SOCKET_PATH from defence360agent.utils import Scope, is_centos6_or_cloudlinux6 logging.raiseExceptions = False CONNECT_TIMEOUT = 10 REQUEST_TIMEOUT = 60 RETRY_DELAY = 10 MIGRATION_TIMEOUT = 4 * 60 * 60 # 4 hours IMUNIFY360 = "imunify360" IMUNIFY360_AGENT = "imunify360-agent" IMUNIFY360_AGENT_SOCKET = "imunify360-agent.socket" AGENT_SOCKET_PATH = "/var/run/defence360agent/simple_rpc.sock" SERVICE = "service" SUBPROCESS_TIMEOUT = 1800 RESTART = "restart" STATUS = "status" SHOW = "show" AGENT_IN_MIGRATION_STATE = "Applying database migrations" def run(cmd, *, timeout=SUBPROCESS_TIMEOUT, check=False, **kwargs): """Run *cmd* with *timeout* without raising TimeoutExpired. On timeout, return CompletedProcess with returncode equal to None. """ try: return subprocess.run(cmd, timeout=timeout, check=check, **kwargs) except subprocess.TimeoutExpired as e: return subprocess.CompletedProcess( e.cmd, returncode=None, stdout=e.stdout, stderr=e.stderr ) def service_is_running(systemctl_exec: Optional[str], name: str) -> bool: """Check with help of [systemctl|service] command status of service""" if systemctl_exec: cmd = [systemctl_exec, STATUS, name] else: cmd = [SERVICE, name, STATUS] cp = run( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return cp.returncode == 0 def restart_service(systemctl_exec: Optional[str], name: str) -> None: """Check with help of [systemctl|service] command status of service""" if systemctl_exec: cmd = [systemctl_exec, RESTART, name] else: cmd = [SERVICE, name, RESTART] run( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def restart_imunify360(systemctl_exec: Optional[str]) -> None: """Restart resident imunify360 services""" restart_service(systemctl_exec, IMUNIFY360) def restart_imunify360_agent(systemctl_exec: Optional[str]) -> None: """Restart non-resident imunify360 services""" restart_service(systemctl_exec, IMUNIFY360_AGENT) def setup_logging(level) -> logging.Logger: logger = logging.getLogger("imunify360-watchdog") logger.setLevel(level) handler = logging.handlers.SysLogHandler("/dev/log") formatter = logging.Formatter("%(name)s: %(message)s") handler.formatter = formatter logger.addHandler(handler) sentry.configure_sentry() return logger def send_to_generic_socket(_msg): with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.settimeout(CONNECT_TIMEOUT) sock.connect(GENERIC_SENSOR_SOCKET_PATH) msg = json.dumps(_msg).encode() + b"\n" start_time = time.monotonic() sock.settimeout(REQUEST_TIMEOUT) sock.sendall(msg) remaining_time = start_time + REQUEST_TIMEOUT - time.monotonic() if remaining_time <= 0: raise socket.timeout() sock.settimeout(remaining_time) with sock.makefile(encoding="utf-8") as file: response = file.readline() if not response: raise ValueError("Empty response from socket") return json.loads(response) def check_agent_socket_alive(systemctl_exec: Optional[str]) -> bool: if is_centos6_or_cloudlinux6(): return service_is_running(systemctl_exec, IMUNIFY360_AGENT) else: return service_is_running(systemctl_exec, IMUNIFY360_AGENT_SOCKET) def generic_sensor_with_retries(rpc_timeout: int) -> Optional[dict]: start = time.time() while True: try: return send_to_generic_socket( { "method": "HEALTH", } ) except Exception: if time.time() - start >= rpc_timeout: raise time.sleep(RETRY_DELAY) def systemctl_executable() -> Optional[str]: """Try to find systemctl in default PATH and return None if failed.""" return shutil.which("systemctl", path=os.defpath) def service_is_migrating(systemctl_exec, name, logger): """ Check that service in "apply migrations" state and do not exhaust timeout """ if systemctl_exec: cmd = [ systemctl_exec, SHOW, name, "-p", "StatusText", "-p", "ExecMainStartTimestampMonotonic", ] else: cmd = [SERVICE, name, SHOW] cp = run( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) # Parse only main parameters from output, other lines ignored params = { key: value for (key, value) in [ key_value.split("=", 1) for key_value in cp.stdout.decode().splitlines() ] if key in ["StatusText", "ExecMainStartTimestampMonotonic"] } if AGENT_IN_MIGRATION_STATE in params["StatusText"]: migration_duration = ( time.monotonic() - int(params["ExecMainStartTimestampMonotonic"]) / 1e6 ) logger.info("%s migrating for %d sec", name, migration_duration) if migration_duration < MIGRATION_TIMEOUT: return True logger.error("Migration took too long") return False def ensure_resident_health( logger: logging.Logger, systemctl_exec: Optional[str], rpc_timeout: int ) -> None: try: response = generic_sensor_with_retries(rpc_timeout) except Exception: logger.exception("Restarting resident service due to RPC failures") restart_imunify360(systemctl_exec) return if not response.get("healthy", False): logger.error( "Restarting resident service due to health report: %s", response.get("why") if response.get("why") else response.get("error"), ) restart_imunify360(systemctl_exec) else: logger.info("%s is healthy: %s", IMUNIFY360, response.get("why")) def ensure_agent_health( logger: logging.Logger, systemctl_exec: Optional[str] ) -> None: try: # since `service *.sock status` returns 0 even socket # file isn't exists we need to check it manually if ( not check_agent_socket_alive(systemctl_exec) or not Path(AGENT_SOCKET_PATH).exists() ): logger.exception("Restarting agent due to socket failures") restart_imunify360_agent(systemctl_exec) except Exception as e: logger.exception("Restarting agent due to %s", e) restart_imunify360_agent(systemctl_exec) def main(rpc_timeout, log_level=logging.INFO): logger = setup_logging(log_level) systemctl_exec = systemctl_executable() if not service_is_running(systemctl_exec, IMUNIFY360): logger.info("%s is not running", IMUNIFY360) return elif service_is_migrating(systemctl_exec, IMUNIFY360, logger): logger.info("%s is migrating at the moment", IMUNIFY360) return ensure_agent_health(logger, systemctl_exec) ensure_resident_health(logger, systemctl_exec, rpc_timeout) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("rpc_timeout", type=int) return parser.parse_args() if __name__ == "__main__": args = parse_args() main(rpc_timeout=args.rpc_timeout)