Server IP : 184.154.167.98 / Your IP : 18.119.107.208 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 8.2.26 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/libexec/imunify360/ |
Upload File : |
#!/opt/imunify360/venv/bin/python3 """This script is a cPanel hook script for several Filemanager related events. Based on: * https://documentation.cpanel.net/display/DD/Guide+to+Standardized+Hooks * https://documentation.cpanel.net/display/DD/Guide+to+Standardized+Hooks+-+Hook+Action+Code """ # noqa: E501 import logging import json import signal import socket import sys from tempfile import NamedTemporaryFile from typing import Callable, List, TextIO from defence360agent import sentry from im360 import aibolit_job logger = logging.getLogger("cpanel_fileman_hook") # _PATH_HOOK contains the location of this hook on the server _PATH_HOOK = "/usr/libexec/imunify360/cpanel_fileman_hook" DESCRIBE_DATA = [ { "blocking": 1, "escalateprivs": 0, "category": "Cpanel", "event": "UAPI::Fileman::upload_files", "stage": "pre", "hook": _PATH_HOOK + " --upload", "exectype": "script", }, { "blocking": 1, "escalateprivs": 0, "category": "Cpanel", "event": "UAPI::Fileman::save_file_content", "stage": "pre", "hook": _PATH_HOOK + " --save", "exectype": "script", }, { "blocking": 1, "escalateprivs": 0, "category": "Cpanel", "event": "Api2::Fileman::savefile", "stage": "pre", "hook": _PATH_HOOK + " --save", "exectype": "script", }, ] class Context: def __init__( self, stdin: TextIO, stdout: TextIO, stderr: TextIO, args: List[str], checker: Callable[[str], bool], ): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.args = args self.checker = checker def status_text( allowed: bool, method=None, filename=None, folder=None, user=None ) -> str: if not allowed: logger.info( "0 BAILOUT malware detected when %s '%s' in %s for user %s", method, filename, folder, user, ) return "1" if allowed else "0 BAILOUT malware detected" def status_code(allowed: bool) -> int: return 0 def describe_action(ctx: Context) -> int: ctx.stdout.write(json.dumps(DESCRIBE_DATA)) return 0 def check_upload(ctx: Context) -> int: logger.info("upload action") suffix = "-key" path = "" filename = "" allowed = True data = json.loads(ctx.stdin.read())["data"] for k, v in data["args"].items(): if k.endswith(suffix): path = data["args"][k[: -len(suffix)]] # "file-eicar.com-key":"file-0" # [len("file-"):-len(suffix)] -> eicar.com filename = k[len("file-"):-len(suffix)] break if path != "": allowed = ctx.checker(path) ctx.stdout.write( status_text( allowed, "upload", filename, data["args"].get("dir"), data["user"] ) ) return status_code(allowed) def check_save(ctx: Context) -> int: logger.info("save action") allowed = True data = json.loads(ctx.stdin.read())["data"] content = data["args"].get("content") if content: with NamedTemporaryFile(mode="w") as ntf: ntf.write(content) ntf.flush() allowed = ctx.checker(ntf.name) ctx.stdout.write( status_text( allowed, "save", data["args"].get("filename") or data["args"].get("file"), data["args"].get("dir") or data["args"].get("path"), data["user"], ) ) return status_code(allowed) KNOWN_ACTIONS = { "describe": describe_action, "upload": check_upload, "save": check_save, } def aibolit_checker(file_to_scan: str) -> bool: # FOLLOWING IS MOSTLY COPIED FROM modsec_scan_real.py resident_dir_path = aibolit_job.RESIDENT_DIR # to include the import time, we could read the start time of the # process https://gist.github.com/westhood/1073585 remaining_time = aibolit_job.create_remaining_time_func( aibolit_job.UPLOAD_TIMEOUT ) # signals we'll be waiting for from aibolit sigset = {signal.SIGUSR1, signal.SIGUSR2} # block the signal in all threads signal.pthread_sigmask(signal.SIG_BLOCK, sigset) # submit the uploaded file for scanning # create PID.upload_job in the resident dir aibolit_job.create_upload_job( files=[file_to_scan], resident_dir_path=resident_dir_path, timeout=remaining_time(), ) logger.info("file %s is sent for scanning", file_to_scan) # notify aibolit about the new job aibolit_job.notify_aibolit_start_it_if_necessary(timeout=remaining_time()) # wait for response while True: # use sigtimedwait() instead of signal() to get the uid # note: ignore a possible race on retry inside sigtimedwait() on # receiving a signal (see sigtimedwait()'s Python docs) si = signal.sigtimedwait(sigset, remaining_time()) if si is None: # timed out logger.warning("timed out while scanning %s", file_to_scan) return True if si.si_uid == 0: # the signal is from root if si.si_signo == signal.SIGUSR1: return False elif si.si_signo == signal.SIGUSR2: return True else: assert 0, "shouldn't happen" # pragma: no cover def setup_logging() -> None: """Setup logging carefully for ossec to capture. When the hook prints logs on stderr, cpanel captures logs and prints them to /usr/local/cpanel/logs/error_log file. We need to make sure the logging format matches the syslog format for ossec to decode it perfectly. """ global logger hostname = socket.getfqdn() logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() formatter = logging.Formatter( f"%(asctime)s {hostname} %(name)s[%(process)d]: %(message)s", datefmt="%b %d %H:%M:%S", ) handler.setFormatter(formatter) logger.addHandler(handler) sentry.configure_sentry() def do_main(ctx: Context) -> int: if len(ctx.args) < 2: print("No command is given.", file=ctx.stderr) return 1 if not ctx.args[1].startswith("--"): print("Wrong argument:", ctx.args[1], file=ctx.stderr) return 1 action = ctx.args[1][2:] if action not in KNOWN_ACTIONS: print("Unknown action:", action, file=ctx.stderr) return 1 return KNOWN_ACTIONS[action](ctx) def main(ctx: Context) -> int: try: return do_main(ctx) except Exception as e: print("1 Exception:", e, file=ctx.stderr) logger.exception("internal error: %s", e) return 1 if __name__ == "__main__": setup_logging() # First line of log does not get its own line but prepended with # cpanel logging format, e.g.(/usr/local/cpanel/logs/error_log): # # [2024-07-16 07:42:40 +0000] info [uapi] STDERR output from hook: /usr/libexec/imunify360/cpanel_fileman_hook --upload # [2024-07-16 07:42:40 +0000] info [uapi] Jul 16 07:42:39 cl7x64.cltest.com cpanel_fileman_hook[101676]: Starting imunify fileman hook # Jul 16 07:42:39 cl7x64.cltest.com cpanel_fileman_hook[101676]: upload action # Jul 16 07:42:39 cl7x64.cltest.com cpanel_fileman_hook[101676]: file /home/user228/tmp/Cpanel_Form_file.upload.bb6355b0 is sent for scanning # Jul 16 07:42:40 cl7x64.cltest.com cpanel_fileman_hook[101676]: 0 BAILOUT malware detected when upload 'eicar.com' in public_html for user user228 # Jul 16 07:42:40 cl7x64.cltest.com cpanel_fileman_hook[101676]: exiting with code 0 # # [2024-07-16 07:42:40 +0000] info [uapi] End STDERR from hook # # This is the reason we issue a log during startup logger.info("Starting imunify fileman hook") ctx = Context( sys.stdin, sys.stdout, sys.stderr, sys.argv, aibolit_checker, ) code = main(ctx) ctx.stdout.flush() ctx.stderr.flush() logger.info("exiting with code %s", code) sys.exit(code)