Server IP : 184.154.167.98 / Your IP : 3.137.198.37 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/libexec/pcp/pmdas/bcc/modules/ |
Upload File : |
# # Copyright (C) 2017-2018 Marko Myllynen <myllynen@redhat.com> # Based on the tcplife BCC tool by Brendan Gregg: # https://github.com/iovisor/bcc/blob/master/tools/tcplife.py # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # """ PCP BCC PMDA tcpperpid module """ # pylint: disable=invalid-name, too-few-public-methods, too-many-instance-attributes import ctypes as ct from threading import Lock, Thread from os import path from bcc import BPF from pcp.pmapi import pmUnits from cpmapi import PM_TYPE_U64, PM_SEM_COUNTER, PM_SPACE_BYTE from cpmda import PMDA_FETCH_NOVALUES from modules.pcpbcc import PCPBCCBase # # BPF program # if BPF.tracepoint_exists("sock", "inet_sock_set_state"): bpf_src = "modules/tcplife_tp.bpf" else: bpf_src = "modules/tcplife_kp.bpf" # Alternative, "high resolution" BPF bpf_highres = "modules/tcptop.bpf" # # PCP BCC PMDA constants # MODULE = 'tcpperpid' BASENS = 'proc.io.net.total.' units_bytes = pmUnits(1, 0, 0, PM_SPACE_BYTE, 0, 0) # # PCP BCC Module # class PCPBCCModule(PCPBCCBase): """ PCP BCC tcpperpid module """ def __init__(self, config, log, err, proc_refresh): """ Constructor """ PCPBCCBase.__init__(self, MODULE, config, log, err) self.pids = [] self.proc_filter = None self.proc_refresh = proc_refresh self.highres = False self.dports = [] self.lports = [] self.buffer_page_count = 64 for opt in self.config.options(MODULE): if opt == 'dport': self.dports = [int(port) for port in self.config.get(MODULE, opt).split(",")] self.log("Filtering on remote ports: %s." % str(self.dports)) if opt == 'lport': self.lports = [int(port) for port in self.config.get(MODULE, opt).split(",")] self.log("Filtering on local ports: %s." % str(self.lports)) if opt == 'process': self.proc_filter = self.config.get(MODULE, opt) self.update_pids(self.get_proc_info(self.proc_filter)) if opt == 'highres': self.highres = self.config.getboolean(MODULE, opt) if opt == 'buffer_page_count': self.buffer_page_count = int(self.config.get(MODULE, opt)) if not self.buffer_page_count or \ self.buffer_page_count & (self.buffer_page_count - 1): raise RuntimeError("Buffer page count is not power of two.") self.ipv4_stats = {} self.ipv6_stats = {} self.stale_ipv4_pids = [] self.stale_ipv6_pids = [] self.lock = Lock() self.thread = None # Compat with kernel < 4.16 src = bpf_src if not self.highres else bpf_highres self.log("Using BPF source file %s." % src) # Exit hard if impossible to continue if self.bcc_version_tuple() < (0, 6, 1) and not self.highres: raise RuntimeError("BCC 0.6.1+ is required for this module in non-highres mode.") self.log("Initialized.") def handle_ipv4_event(self, _cpu, data, _size): """ IPv4 event handler """ event = self.bpf["ipv4_events"].event(data) pid = str(event.pid).zfill(6) self.lock.acquire() if pid not in self.ipv4_stats: self.ipv4_stats[pid] = [int(event.tx_b), int(event.rx_b)] else: self.ipv4_stats[pid][0] += int(event.tx_b) self.ipv4_stats[pid][1] += int(event.rx_b) self.lock.release() def handle_ipv6_event(self, _cpu, data, _size): """ IPv6 event handler """ event = self.bpf["ipv6_events"].event(data) pid = str(event.pid).zfill(6) self.lock.acquire() if pid not in self.ipv6_stats: self.ipv6_stats[pid] = [int(event.tx_b), int(event.rx_b)] else: self.ipv6_stats[pid][0] += int(event.tx_b) self.ipv6_stats[pid][1] += int(event.rx_b) self.lock.release() def metrics(self): """ Get metric definitions """ name = BASENS self.items = ( # Name - reserved - type - semantics - units - help (name + 'tx', None, PM_TYPE_U64, PM_SEM_COUNTER, units_bytes, 'tcp tx per pid'), (name + 'rx', None, PM_TYPE_U64, PM_SEM_COUNTER, units_bytes, 'tcp rx per pid'), ) return True, self.items def reset_cache(self): """ Reset internal cache """ # Preserve the contents to allow providing # results when a traced process has exited def undef_cache(self): """ Undefine internal cache """ # Preserve the contents to allow providing # results when a traced process has exited def perf_buffer_lost_cb(self, lost_cnt): """ Callback for lost perf buffer events """ self.err("Lost %d events; buffer_page_count should be increased." % lost_cnt) def compile(self): """ Compile BPF """ try: if not self.pids and self.proc_filter and not self.proc_refresh: raise RuntimeError("No process to attach found.") if not self.bpf_text: bsrc = bpf_src if not self.highres else bpf_highres with open(path.dirname(__file__) + '/../' + bsrc) as src: self.bpf_text = src.read() if self.highres: self.bpf_text = self.bpf_text.replace("FILTER", "FILTER_PID") if self.dports: filterp = " && ".join(["dport != %d" % port for port in self.dports]) filter_txt = "if (%s) { birth.delete(&sk); return 0; }" % filterp self.bpf_text = self.bpf_text.replace("FILTER_DPORT", filter_txt) if self.lports: filterp = " && ".join(["lport != %d" % port for port in self.lports]) filter_txt = "if (%s) { birth.delete(&sk); return 0; }" % filterp self.bpf_text = self.bpf_text.replace("FILTER_LPORT", filter_txt) if not self.pids and self.proc_filter and self.proc_refresh: self.log("No process to attach found, activation postponed.") return bpf_text = self.apply_pid_filter(self.bpf_text, self.pids, False) bpf_text = bpf_text.replace('FILTER_PID', '') bpf_text = bpf_text.replace('FILTER_DPORT', '') bpf_text = bpf_text.replace('FILTER_LPORT', '') bpf_text = bpf_text.replace('FILTER_FAMILY', '') if self.debug: self.log("BPF to be compiled:\n" + bpf_text.strip()) self.bpf = BPF(text=bpf_text) if not self.highres: self.bpf["ipv4_events"].open_perf_buffer(self.handle_ipv4_event, page_cnt=self.buffer_page_count, lost_cb=self.perf_buffer_lost_cb) self.bpf["ipv6_events"].open_perf_buffer(self.handle_ipv6_event, page_cnt=self.buffer_page_count, lost_cb=self.perf_buffer_lost_cb) self.thread = Thread(name="bpfpoller", target=self.perf_buffer_poller) self.thread.daemon = True self.thread.start() self.log("Compiled.") except Exception as error: # pylint: disable=broad-except self.bpf = None self.err(str(error)) self.err("Module NOT active!") raise def refresh_highres(self): """ Refresh with alternative 'high resolution' BPF """ for data in "ipv4_send_bytes", "ipv4_recv_bytes", \ "ipv6_send_bytes", "ipv6_recv_bytes": counts = self.bpf[data] stats = self.ipv4_stats if "4" in data else self.ipv6_stats item = 0 if "send" in data else 1 for k, v in counts.items(): pid = str(k.pid).zfill(6) if pid not in stats: stats[pid] = [0, 0] stats[pid][item] += v.value counts.clear() def refresh(self): """ Refresh BPF data """ # We do not check for self.bpf to allow # results when a traced process has exited self.lock.acquire() self.insts = {} # Clean stale data for pid in list(self.stale_ipv4_pids): del self.ipv4_stats[pid] self.stale_ipv4_pids.remove(pid) for pid in list(self.stale_ipv6_pids): del self.ipv6_stats[pid] self.stale_ipv6_pids.remove(pid) if self.highres: self.refresh_highres() for pid in self.ipv4_stats: if not self.pid_alive(pid): self.stale_ipv4_pids.append(pid) self.insts[pid] = ct.c_int(1) for pid in self.ipv6_stats: if not self.pid_alive(pid): self.stale_ipv6_pids.append(pid) self.insts[pid] = ct.c_int(1) self.lock.release() return self.insts def bpfdata(self, item, inst): """ Return BPF data as PCP metric value """ try: self.lock.acquire() key = self.pmdaIndom.inst_name_lookup(inst) value = 0 if key in self.ipv4_stats: value += self.ipv4_stats[key][item] if key in self.ipv6_stats: value += self.ipv6_stats[key][item] self.lock.release() return [value, 1] except Exception: # pylint: disable=broad-except self.lock.release() return [PMDA_FETCH_NOVALUES, 0]