Server IP : 184.154.167.98 / Your IP : 18.223.238.38 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/libexec/pcp/pmdas/bcc/modules/ |
Upload File : |
# # Copyright (C) 2017-2018 Marko Myllynen <myllynen@redhat.com> # Copyright (C) 2018 Andreas Gerstmayr <andreas@gerstmayr.me> # Based on the tcplife BCC tool by Brendan Gregg: # https://github.com/iovisor/bcc/blob/master/tools/tcplife.py # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # """ PCP BCC PMDA tcplife module """ # pylint: disable=invalid-name, too-few-public-methods, too-many-instance-attributes import ctypes as ct from collections import deque from threading import Lock, Thread from socket import inet_ntop, AF_INET, AF_INET6 from struct import pack from os import path from bcc import BPF from pcp.pmapi import pmUnits from cpmapi import PM_TYPE_U32, PM_TYPE_U64, PM_TYPE_STRING, PM_SEM_INSTANT from cpmapi import PM_SPACE_BYTE, PM_TIME_USEC from cpmda import PMDA_FETCH_NOVALUES from modules.pcpbcc import PCPBCCBase # # BPF program # if BPF.tracepoint_exists("sock", "inet_sock_set_state"): bpf_src = "modules/tcplife_tp.bpf" else: bpf_src = "modules/tcplife_kp.bpf" # # PCP BCC PMDA constants # MODULE = 'tcplife' BASENS = 'proc.io.net.tcp.' units_bytes = pmUnits(1, 0, 0, PM_SPACE_BYTE, 0, 0) units_usecs = pmUnits(0, 1, 0, 0, PM_TIME_USEC, 0) units_none = pmUnits(0, 0, 0, 0, 0, 0) # # PCP BCC Module # class PCPBCCModule(PCPBCCBase): """ PCP BCC tcplife module """ def __init__(self, config, log, err, proc_refresh): """ Constructor """ PCPBCCBase.__init__(self, MODULE, config, log, err) self.pids = [] self.proc_filter = None self.proc_refresh = proc_refresh self.dports = [] self.lports = [] self.session_count = 20 self.buffer_page_count = 64 for opt in self.config.options(MODULE): if opt == 'dport': self.dports = [int(port) for port in self.config.get(MODULE, opt).split(",")] self.log("Filtering on remote ports: %s." % str(self.dports)) if opt == 'lport': self.lports = [int(port) for port in self.config.get(MODULE, opt).split(",")] self.log("Filtering on local ports: %s." % str(self.lports)) if opt == 'process': self.proc_filter = self.config.get(MODULE, opt) self.update_pids(self.get_proc_info(self.proc_filter)) if opt == 'session_count': self.session_count = int(self.config.get(MODULE, opt)) if opt == 'buffer_page_count': self.buffer_page_count = int(self.config.get(MODULE, opt)) if not self.buffer_page_count or \ self.buffer_page_count & (self.buffer_page_count - 1): raise RuntimeError("Buffer page count is not power of two.") self.cache = deque(maxlen=self.session_count) self.insts = {str(i) : ct.c_int(1) for i in range(self.session_count)} self.lock = Lock() self.thread = None self.log("Using BPF source file %s." % bpf_src) # Exit hard if impossible to continue if self.bcc_version_tuple() < (0, 6, 1): raise RuntimeError("BCC 0.6.1+ is required for this module.") self.log("Initialized.") def handle_ip_event(self, data, version): """ IP event handler """ if version == 4: event = self.bpf["ipv4_events"].event(data) laddr = inet_ntop(AF_INET, pack("I", event.saddr)) daddr = inet_ntop(AF_INET, pack("I", event.daddr)) else: event = self.bpf["ipv6_events"].event(data) laddr = inet_ntop(AF_INET6, event.saddr) daddr = inet_ntop(AF_INET6, event.daddr) self.lock.acquire() self.cache.appendleft([ event.pid, event.task.decode(), laddr, event.ports >> 32, daddr, event.ports & 0xffffffff, event.tx_b, event.rx_b, event.span_us ]) self.lock.release() def handle_ipv4_event(self, _cpu, data, _size): """ IPv4 event handler """ self.handle_ip_event(data, 4) def handle_ipv6_event(self, _cpu, data, _size): """ IPv6 event handler """ self.handle_ip_event(data, 6) # pylint: disable=bad-continuation def metrics(self): """ Get metric definitions """ name = BASENS self.items = ( # Name - reserved - type - semantics - units - help (name + 'pid', None, PM_TYPE_U32, PM_SEM_INSTANT, units_none, 'PID'), (name + 'comm', None, PM_TYPE_STRING, PM_SEM_INSTANT, units_none, 'command'), (name + 'laddr', None, PM_TYPE_STRING, PM_SEM_INSTANT, units_none, 'local address'), (name + 'lport', None, PM_TYPE_U32, PM_SEM_INSTANT, units_none, 'local port'), (name + 'daddr', None, PM_TYPE_STRING, PM_SEM_INSTANT, units_none, 'destination ' 'address'), (name + 'dport', None, PM_TYPE_U32, PM_SEM_INSTANT, units_none, 'destination port'), (name + 'tx', None, PM_TYPE_U64, PM_SEM_INSTANT, units_bytes, 'transmitted data'), (name + 'rx', None, PM_TYPE_U64, PM_SEM_INSTANT, units_bytes, 'received data'), (name + 'duration', None, PM_TYPE_U32, PM_SEM_INSTANT, units_usecs, 'duration of ' 'the TCP session (from TCP_ESTABLISHED/TCP_SYN_* until TCP_CLOSE)'), ) return True, self.items def perf_buffer_lost_cb(self, lost_cnt): """ Callback for lost perf buffer events """ self.err("Lost %d events; buffer_page_count should be increased." % lost_cnt) def compile(self): """ Compile BPF """ try: if not self.pids and self.proc_filter and not self.proc_refresh: raise RuntimeError("No process to attach found.") if not self.bpf_text: with open(path.dirname(__file__) + '/../' + bpf_src) as src: self.bpf_text = src.read() if self.dports: filterp = " && ".join(["dport != %d" % port for port in self.dports]) filter_txt = "if (%s) { birth.delete(&sk); return 0; }" % filterp self.bpf_text = self.bpf_text.replace("FILTER_DPORT", filter_txt) if self.lports: filterp = " && ".join(["lport != %d" % port for port in self.lports]) filter_txt = "if (%s) { birth.delete(&sk); return 0; }" % filterp self.bpf_text = self.bpf_text.replace("FILTER_LPORT", filter_txt) if not self.pids and self.proc_filter and self.proc_refresh: self.log("No process to attach found, activation postponed.") return bpf_text = self.apply_pid_filter(self.bpf_text, self.pids, False) bpf_text = bpf_text.replace('FILTER_PID', '') bpf_text = bpf_text.replace('FILTER_DPORT', '') bpf_text = bpf_text.replace('FILTER_LPORT', '') bpf_text = bpf_text.replace('FILTER_FAMILY', '') if self.debug: self.log("BPF to be compiled:\n" + bpf_text.strip()) self.bpf = BPF(text=bpf_text) self.bpf["ipv4_events"].open_perf_buffer(self.handle_ipv4_event, page_cnt=self.buffer_page_count, lost_cb=self.perf_buffer_lost_cb) self.bpf["ipv6_events"].open_perf_buffer(self.handle_ipv6_event, page_cnt=self.buffer_page_count, lost_cb=self.perf_buffer_lost_cb) self.thread = Thread(name="bpfpoller", target=self.perf_buffer_poller) self.thread.daemon = True self.thread.start() self.log("Compiled.") except Exception as error: # pylint: disable=broad-except self.bpf = None self.err(str(error)) self.err("Module NOT active!") raise def refresh(self): """ Refresh BPF data """ if self.bpf is None: return None return self.insts def bpfdata(self, item, inst): """ Return BPF data as PCP metric value """ try: key = int(self.pmdaIndom.inst_name_lookup(inst)) self.lock.acquire() value = self.cache[key][item] self.lock.release() return [value, 1] except Exception: # pylint: disable=broad-except self.lock.release() return [PMDA_FETCH_NOVALUES, 0]