Server IP : 184.154.167.98 / Your IP : 3.145.105.116 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 8.2.26 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/libexec/pcp/pmdas/bcc/modules/ |
Upload File : |
# # Copyright (C) 2018 Andreas Gerstmayr <andreas@gerstmayr.me> # Based on the tcptop BCC tool by Brendan Gregg: # https://github.com/iovisor/bcc/blob/master/tools/tcptop.py # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # """ PCP BCC PMDA tcptop module """ # pylint: disable=invalid-name, too-many-instance-attributes, too-many-locals import ctypes as ct from collections import namedtuple from threading import Lock, Thread from socket import inet_ntop, AF_INET, AF_INET6 from struct import pack from time import sleep from os import path from bcc import BPF from pcp.pmapi import pmUnits from cpmapi import PM_TYPE_U32, PM_TYPE_U64, PM_TYPE_STRING, PM_SEM_INSTANT, PM_SPACE_BYTE from cpmda import PMDA_FETCH_NOVALUES from modules.pcpbcc import PCPBCCBase # # BPF program # bpf_src = "modules/tcptop.bpf" # # PCP BCC PMDA constants # MODULE = 'tcptop' METRIC = 'proc.io.net.tcptop.' units_bytes = pmUnits(1, 0, 0, PM_SPACE_BYTE, 0, 0) units_none = pmUnits(0, 0, 0, 0, 0, 0) TCPSessionKey = namedtuple('TCPSession', ['pid', 'laddr', 'lport', 'daddr', 'dport']) # # PCP BCC Module # class PCPBCCModule(PCPBCCBase): """ PCP BCC tcptop module """ def __init__(self, config, log, err, proc_refresh): """ Constructor """ PCPBCCBase.__init__(self, MODULE, config, log, err) self.pids = [] self.proc_filter = None self.proc_refresh = proc_refresh self.interval = 1 self.conn_count = 20 for opt in self.config.options(MODULE): if opt == 'interval': self.interval = int(self.config.get(MODULE, opt)) if opt == 'conn_count': self.conn_count = int(self.config.get(MODULE, opt)) if opt == 'process': self.proc_filter = self.config.get(MODULE, opt) self.update_pids(self.get_proc_info(self.proc_filter)) self.cache = None self.insts = {str(i) : ct.c_int(1) for i in range(self.conn_count)} self.lock = Lock() self.thread = None # Exit hard if impossible to continue if self.bcc_version() == "0.6.1": raise RuntimeError("BCC 0.6.1 bug makes it incompatible with this module.") self.log("Initialized.") # pylint: disable=line-too-long def metrics(self): """ Get metric definitions """ name = METRIC self.items = ( # Name - reserved - type - semantics - units - help (name + 'pid', None, PM_TYPE_U32, PM_SEM_INSTANT, units_none, 'PID'), (name + 'comm', None, PM_TYPE_STRING, PM_SEM_INSTANT, units_none, 'command'), (name + 'laddr', None, PM_TYPE_STRING, PM_SEM_INSTANT, units_none, 'local address'), (name + 'lport', None, PM_TYPE_U32, PM_SEM_INSTANT, units_none, 'local port'), (name + 'daddr', None, PM_TYPE_STRING, PM_SEM_INSTANT, units_none, 'destination address'), (name + 'dport', None, PM_TYPE_U32, PM_SEM_INSTANT, units_none, 'destination port'), (name + 'rx', None, PM_TYPE_U64, PM_SEM_INSTANT, units_bytes, 'bytes received'), (name + 'tx', None, PM_TYPE_U64, PM_SEM_INSTANT, units_bytes, 'bytes sent'), ) return True, self.items def reset_cache(self): """ Reset internal cache """ self.cache = {} def undef_cache(self): """ Undefine internal cache """ self.cache = None def compile(self): """ Compile BPF """ try: if not self.pids and self.proc_filter and not self.proc_refresh: raise RuntimeError("No process to attach found.") if not self.bpf_text: with open(path.dirname(__file__) + '/../' + bpf_src) as src: self.bpf_text = src.read() if not self.pids and self.proc_filter and self.proc_refresh: self.log("No process to attach found, activation postponed.") return bpf_text = self.apply_pid_filter(self.bpf_text, self.pids, False) bpf_text = bpf_text.replace('FILTER_PID', '') bpf_text = bpf_text.replace('FILTER_FAMILY', '') bpf_text = bpf_text.replace('container_should_be_filtered()', '0') if self.debug: self.log("BPF to be compiled:\n" + bpf_text.strip()) self.reset_cache() self.bpf = BPF(text=bpf_text) if self.interval > 0: self.thread = Thread(name="bpfpoller", target=self.refresh_stats_loop) self.thread.daemon = True self.thread.start() self.log("Compiled.") except Exception as error: # pylint: disable=broad-except self.bpf = None self.undef_cache() self.err(str(error)) self.err("Module NOT active!") raise @staticmethod def pid_to_comm(pid): """ Get command of PID """ try: return open("/proc/%d/comm" % pid, "r").read().rstrip() except IOError: return str(pid) @staticmethod def ipv4_table_to_dict(table): """Build hashable dict from IPv4 BPF table""" return { TCPSessionKey( pid=k.pid, laddr=inet_ntop(AF_INET, pack("I", k.saddr)), lport=k.lport, daddr=inet_ntop(AF_INET, pack("I", k.daddr)), dport=k.dport, ): v.value for k, v in table.items() } @staticmethod def ipv6_table_to_dict(table): """Build hashable dict from IPv6 BPF table""" return { TCPSessionKey( pid=k.pid, laddr=inet_ntop(AF_INET6, k.saddr), lport=k.lport, daddr=inet_ntop(AF_INET6, k.daddr), dport=k.dport, ): v.value for k, v in table.items() } def refresh_stats(self): """ Refresh statistics from BPF table """ self.lock.acquire() self.reset_cache() self.lock.release() ipv4_send_bytes = self.bpf["ipv4_send_bytes"] ipv4_send_bytes_dict = self.ipv4_table_to_dict(ipv4_send_bytes) ipv4_recv_bytes = self.bpf["ipv4_recv_bytes"] ipv4_recv_bytes_dict = self.ipv4_table_to_dict(ipv4_recv_bytes) ipv6_send_bytes = self.bpf["ipv6_send_bytes"] ipv6_send_bytes_dict = self.ipv6_table_to_dict(ipv6_send_bytes) ipv6_recv_bytes = self.bpf["ipv6_recv_bytes"] ipv6_recv_bytes_dict = self.ipv6_table_to_dict(ipv6_recv_bytes) # Build dict of all seen keys (connections) keys = ipv4_send_bytes_dict.copy() keys.update(ipv4_recv_bytes_dict) keys.update(ipv6_send_bytes_dict) keys.update(ipv6_recv_bytes_dict) idx = 0 for k, _ in sorted(keys.items(), key=lambda kv: kv[1], reverse=True): send_bytes = 0 recv_bytes = 0 if k in ipv4_send_bytes_dict: send_bytes = ipv4_send_bytes_dict[k] if k in ipv4_recv_bytes_dict: recv_bytes = ipv4_recv_bytes_dict[k] if k in ipv6_send_bytes_dict: send_bytes = ipv6_send_bytes_dict[k] if k in ipv6_recv_bytes_dict: recv_bytes = ipv6_recv_bytes_dict[k] self.lock.acquire() self.cache[idx] = [ k.pid, self.pid_to_comm(k.pid), k.laddr, k.lport, k.daddr, k.dport, recv_bytes, send_bytes ] self.lock.release() idx += 1 if idx >= self.conn_count: break ipv4_send_bytes.clear() ipv4_recv_bytes.clear() ipv6_send_bytes.clear() ipv6_recv_bytes.clear() return idx def refresh_stats_loop(self): """ Refresh statistics thread loop """ try: bpf = self.bpf while bpf == self.bpf: cnt = self.refresh_stats() if self.debug: self.log("Fetched data from BPF table, " "current number of connections: %d." % cnt) sleep(self.interval) except Exception as error: # pylint: disable=broad-except self.err(str(error)) self.err("Failed to refresh BPF statistics!") self.log("Statistics refresher thread exiting.") def refresh(self): """ Refresh BPF data """ if self.bpf is None: return None if self.interval == 0: self.refresh_stats() return self.insts def bpfdata(self, item, inst): """ Return BPF data as PCP metric value """ try: key = int(self.pmdaIndom.inst_name_lookup(inst)) self.lock.acquire() value = self.cache[key][item] self.lock.release() return [value, 1] except Exception: # pylint: disable=broad-except self.lock.release() return [PMDA_FETCH_NOVALUES, 0]