Server IP : 184.154.167.98 / Your IP : 18.227.105.110 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 8.2.26 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/libexec/pcp/pmdas/json/ |
Upload File : |
#!/usr/bin/pmpython # -*-Python-*- # pylint: disable=too-many-public-methods, too-many-instance-attributes ''' Performance Metrics Domain Agent exporting JSON metrics. ''' # # Copyright (c) 2014-2015 Red Hat. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # import json import jsonpointer from collections import OrderedDict from pcp.pmda import PMDA, pmdaMetric, pmdaIndom import cpmapi as c_api from pcp.pmapi import pmUnits, pmContext, pmErr from ctypes import c_int import os, stat, pwd import re import traceback import subprocess import shlex # From the six module, load some python 2 vs. 3 compatibility # functions. from six import iteritems, text_type MAX_CLUSTER = 0xfff # 12 bits, see pcp/libpcp.h MAX_METRIC = 0x3ff # 10 bits, see pcp/libpcp.h MAX_INDOM = 0x7fffffff NOBODY_UID = -1 NOBODY_GID = -1 def preexec(): ''' Function to be run before running non-trusted 'data-exec' commands. ''' os.setgid(NOBODY_GID) os.setuid(NOBODY_UID) return class Metric(object): ''' Metric information class ''' __name_re = re.compile(r'^[a-zA-Z][\w_\.]+$') def __init__(self, name_prefix, cluster, pmda): self.__name = '' self.name_prefix = name_prefix self.full_name = '' self.cluster = cluster self.idx = -1 self.__pmda = pmda self.desc = '' self.type = c_api.PM_TYPE_UNKNOWN self.sem = c_api.PM_SEM_INSTANT self.pointer = None self.pmid = None self.obj = None self.indom_cache = None self.index_pointer = None self.__units_val = pmUnits(0, 0, 0, 0, 0, 0) self.__units_str = '' def log(self, string): ''' Log an informational message ''' return self.__pmda.log(string) # Make sure when setting 'name', 'full_name' also gets updated. @property def name(self): ''' Get metric name value. ''' return self.__name @name.setter def name(self, name): ''' Set metric name value. ''' # Metric names must start with an alphabetic character. The rest # of the characters must be alphanumeric or an '_'. if Metric.__name_re.match(name): self.__name = name self.full_name = "%s.%s" % (self.name_prefix, name) else: self.log("Invalid metric name '%s'" % name) raise RuntimeError("Invalid metric name '%s'" % name) # For the 'units' property, internally we store it 2 different # ways: as a text string and as a numeric value. @property def units(self): ''' Get metric units value. ''' return self.__units_str @units.setter def units(self, units): ''' Set metric units value. ''' try: # pmParseUnitsStr() doesn't handle unicode utf8_units = units.encode("utf-8") # pmParseUnitsStr() returns 2 things: a units value and a # multipler value. (units_val, dummy) = pmContext.pmParseUnitsStr(utf8_units) self.__units_val = units_val self.__units_str = units except pmErr: self.log("Ignoring invalid units '%s'" % units) self.log("%s" % traceback.format_exc()) def valid(self): ''' Is metric valid?. ''' return self.__name != '' and self.type != c_api.PM_TYPE_UNKNOWN def create(self): ''' Create the metric. Note that the metric will still need to be added to the PMDA. ''' if not self.valid(): self.log("Invalid metric") raise RuntimeError("Invalid metric") self.pmid = self.__pmda.pmid(self.cluster, self.idx) if self.indom_cache != None: self.obj = pmdaMetric(self.pmid, self.type, self.indom_cache.indom, self.sem, self.__units_val) else: self.obj = pmdaMetric(self.pmid, self.type, c_api.PM_INDOM_NULL, self.sem, self.__units_val) # Note that you can't delete individual metrics. The # pmda.{clear,reset}_metrics() functions clear out *all* metrics. class IndomCache(pmdaIndom): ''' Indom (instance domain) cache information class ''' def __init__(self, serial, max_value, pmda): self.__pmda = pmda self.serial = serial # In IndomCache.add_value, we're using 'value' as the inst # value. However, the pmdaCache routines treat the passed in # value as the 'private' field and generates its own inst # value. However, this 'private' field isn't saved and # restored, so it isn't very useful for our purposes. # # To get around this, we'll use an OrderedDict so that the # dictionary order should match up with the inst order. # (Another way to fix this problem would be to go ahead and # call pmdaCacheStore() in IndomCache.add_value(), but that # fix would require more api calls.) self.__values = OrderedDict() # '__names_by_values' is the inversion of '__values'. self.__names_by_values = {} # The indom cache has a notion of "inactive" values (values # we've seen before, but are not in the current fetch) and # "active" values (values in the current fetch). Seting the # active state from python is a bit tricky. '__active_values' # contains a dictionary of active values. self.__active_values = OrderedDict() pmdaIndom.__init__(self, pmda.indom(self.serial), self.__values) try: self.__pmda.add_indom(self) except KeyError: # If we've seen this indom before, it will already be # present in the pmda, so replace it. self.__pmda.replace_indom(self, self.__values) self.__maxval = max_value self.cache_resize(max_value) self.__nextval = 0 @property def indom(self): ''' Get cache's indom. ''' return self.it_indom def log(self, string): ''' Log an informational message ''' return self.__pmda.log(string) def add_value(self, name, value=c_api.PM_IN_NULL): ''' Add a value to the indom ''' # PMDA.replace_indom() wants a dictionary, indexed by # indom string value. PMDA.replace_indom() doesn't really # care what is stored at that string value. We're storing the # instance there. if value == c_api.PM_IN_NULL: value = self.next_value() if self.__pmda.debug: self.log("Adding ('%s', %d) to the cache" % (name, value)) self.__values[name] = c_int(value) if value >= self.__nextval: self.__nextval = value + 1 self.__names_by_values[value] = name def set_active(self, name): ''' Mark a indom as active. ''' if name not in self.__values: raise KeyError(name) self.__active_values[name] = self.__values[name] def lookup_name(self, name): ''' Lookup name in an indom cache and return its associated value. ''' if name not in self.__values: raise KeyError(name) valueobj = self.__values[name] return valueobj.value def lookup_value(self, value): ''' Lookup a value in an indom cache and return its associated name. ''' # We could call an api function here (pmda.inst_lookup() which # calls pmdaCacheLookup()), but we can handle this in python # by using the inverted dictionary. if value not in self.__names_by_values: raise KeyError(value) return self.__names_by_values[value] def refresh(self): ''' Update and save the indom cache. ''' self.__pmda.replace_indom(self, self.__values) # Note that set_dict_instances() saves the cache to disk. self.set_dict_instances(self.it_indom, self.__values) # If we've got active values, we want to call # set_dict_instances() on them. This will leave all the items # in the cache, but mark the ones in the __active_values # dictionary as "active". Everything else will be marked as # "inactive". if len(self.__active_values) > 0: self.set_dict_instances(self.it_indom, self.__active_values) self.__active_values.clear() def load(self): ''' Load indom cache values. ''' if self.__pmda.debug: self.log("Loading cache %d..." % self.serial) try: # Notice we're ignoring cache_load() errors. The biggest # one we're ignoring is a non-existent cache. self.cache_load() except pmErr: return for (inst, name) in self: self.add_value(name, inst) def next_value(self): ''' Return next value to be allocated. ''' if self.__nextval > self.__maxval: raise ValueError("Indom cache reached max value.") value = self.__nextval self.__nextval += 1 return value def len(self): ''' Return cache size. ''' return len(self.__values) class JsonSource(object): ''' JSON Source class. Contains all metrics and data needed by a single JSON source. ''' def __init__(self, path, pmda, trusted): self.__path = path self.__pmda = pmda # cluster, metric_cache_idx, and indom_cache_idx get filled in later. self.__cluster = -1 self.__metric_cache_idx = -1 self.__indom_cache_idx = -1 self.__metric_cache = None self.__indom_cache = None self.__array_indexes = {} # Note that this is the default root name. It can be # overridden with the metadata 'prefix' attribute. self.__root_name = os.path.basename(path) # Note that this is the default data exec command. It can be # overridden with the metadata 'data-exec' attribute. self.__data_exec = "" # Note that this is the default data path. It can be # overridden with the metadata 'data-path' attribute. self.__data_path = "%s/data.json" % path # JSON sources that come from "directory_list" aren't # "trusted", and any command that needs to be run to get JSON # data is run as user "nobody". # # JSON sources that come from "trusted_directory_list' are # trusted, and any command that needs to be run to get JSON # data is run as user "root". self.__trusted = trusted self.__metadata_path = "%s/%s" % (path, pmda.metadata_name) self.__metadata = {} self.__json_data = {} self.__metrics = {} self.__metrics_by_name = {} self.__lastfetch = 0 # Here we need to load the metadata and preparse it, in case # it changes the source name. self.__load_json_metadata() self.__preparse_metadata() def log(self, string): ''' Log an informational message ''' return self.__pmda.log(string) @property def path(self): ''' Get JSON source path value. ''' return self.__path @property def name(self): ''' Get JSON source name value. This defaults to the base name of the directory where the JSON source was found, but can be overridden by the metadata file. ''' return self.__root_name @property def cluster(self): ''' Returns the source's cluster id. ''' return self.__cluster @cluster.setter def cluster(self, cluster): ''' Sets the source's cluster id. ''' self.__cluster = cluster # Note that the cache indexes are laid out like this: # 0: cluster cache (cluster 0 is for the static metrics, # cluster 1 is for the first JSON source) # 1: metric cache for JSON source #1 # 2: indom cache for JSON source #1 # 3: metric cache for JSON source #2 # 4: indom cache for JSON source #2 # 5: metric cache for JSON source #3 # 6: indom cache for JSON source #3 # ... self.__metric_cache_idx = (self.__cluster * 2) - 1 self.__indom_cache_idx = self.__metric_cache_idx + 1 # Now that we know the cluster id, try to load the metric # cache and indom cache. Note that if they aren't present, # that's OK - this must be a new JSON source. self.__metric_cache = IndomCache(self.__metric_cache_idx, MAX_METRIC, self.__pmda) self.__metric_cache.load() self.__indom_cache = IndomCache(self.__indom_cache_idx, MAX_INDOM, self.__pmda) self.__indom_cache.load() def __load_json_metadata(self): ''' Load the JSON metadata file for this JSON source. ''' self.__metadata = {} try: fobj = open(self.__metadata_path) except IOError: self.log("Couldn't open JSON metadata file: %s" % self.__metadata_path) self.log("%s" % traceback.format_exc()) return try: self.__metadata = json.load(fobj) except ValueError: self.log("Couldn't parse JSON metadata from %s" % self.__metadata_path) self.log("%s" % traceback.format_exc()) fobj.close() def __load_json_data(self): ''' Load the JSON data file for this JSON source. ''' self.__lastfetch = self.__pmda.numfetch self.__json_data = {} if self.__data_exec != "": if not self.__trusted and (NOBODY_UID == -1 or NOBODY_GID == -1): self.log("Couldn't run JSON data command: %s" % self.__data_exec) self.log("Couldn't find user 'nobody'") return if self.__pmda.debug: self.log("About to run data-exec command '%s'" % self.__data_exec) try: args = shlex.split(self.__data_exec) # If this data source didn't come from a "trusted" # directory, we have to setuid/setguid to user # "nobody" before running the command by using the # 'preexec_fn'. if not self.__trusted: pobj = subprocess.Popen(args, preexec_fn=preexec, close_fds=True, stdout=subprocess.PIPE) else: pobj = subprocess.Popen(args, close_fds=True, stdout=subprocess.PIPE) (out, dummy) = pobj.communicate() if pobj.returncode != 0: self.log("Warning: data-exec command '%s' returned" " a non-zero return code: %d" \ % (self.__data_exec, pobj.returncode)) except (OSError, ValueError): self.log("Couldn't run JSON data command: %s" % self.__data_exec) self.log("%s" % traceback.format_exc()) return try: self.__json_data = json.loads(out) except ValueError: self.log("Couldn't parse JSON data from command output '%s'" % self.__data_exec) self.log("%s" % traceback.format_exc()) else: if self.__pmda.debug: self.log("Found data-path %s" % self.__data_path) try: fobj = open(self.__data_path) except IOError: self.log("Couldn't open JSON data file: %s" % self.__data_path) self.log("%s" % traceback.format_exc()) return try: self.__json_data = json.load(fobj) except ValueError: self.log("Couldn't parse JSON data from %s" % self.__data_path) self.log("%s" % traceback.format_exc()) fobj.close() def load(self): ''' Load the JSON metadata and data files for this JSON source, then create metrics based on the JSON. ''' if self.__pmda.debug: self.log("Loading JSON source %s" % self.__root_name) # Note that we're loading the metadata in the init function, # so we can update the source name. self.__load_json_data() # If either loading the metadata or data failed, quit. if self.__metadata == {} or self.__json_data == {}: self.cleanup() return # Parse the metadata, creating metrics as needed. try: self.__parse_metadata() except TypeError: self.log("Couldn't parse JSON metadata") self.log("%s" % traceback.format_exc()) # Update the indom list (after we've parsed the metadata). self.__refresh_indoms() def refresh_json_data(self): ''' Reload the JSON data and update indoms. ''' # Load the JSON data (not the metadata). self.__load_json_data() # Update the indom list. self.__refresh_indoms() def cleanup(self): ''' Cleanup JSON source data. ''' self.__metadata = {} self.__json_data = {} self.__metrics = {} def __refresh_indoms(self): ''' Refresh the list of indoms. ''' # Notice we never delete indoms, we just keep adding. self.__array_indexes.clear() for (dummy, metric_info) in iteritems(self.__metrics): # Skip non-arrays. if metric_info.index_pointer == None: continue try: metrics_array = metric_info.pointer.resolve(self.__json_data) # Loop through all the array items, updating the indom # list with any new values. Also remember the array # index where we found a particular indom, to make # retrieval easy. index = 0 for item in metrics_array: indom_value = metric_info.index_pointer.resolve(item) full_name = "%s.%s" % (metric_info.name, indom_value) self.__array_indexes[full_name] = index index += 1 try: dummy = self.__indom_cache.lookup_name(indom_value) except KeyError: # This indom value wasn't found in the indom # cache. Add it. self.__indom_cache.add_value(indom_value) # Mark both old values and new values that we've # seen in this fetch operation as active. self.__indom_cache.set_active(indom_value) except KeyError: self.log("Error while refreshing indom for array %s" % metric_info.name) self.log("%s" % traceback.format_exc()) continue self.__indom_cache.refresh() def __add_metric(self, metric_info): ''' Create and add a metric to the pmda. ''' metric_info.create() # Add the metric to the pmda (unless it is an array metric). if metric_info.type != c_api.PM_TYPE_NOSUPPORT: self.__pmda.add_metric(metric_info.full_name, metric_info.obj, metric_info.desc) self.__metrics[metric_info.idx] = metric_info self.__metrics_by_name[metric_info.name] = metric_info def __parse_array_metadata(self, array_name, metrics_array): ''' Parse a JSON array metadata. ''' # Process the array's metrics array. metric_prefix = "%s.%s" % (self.__pmda.pmda_name, self.__root_name) for item in metrics_array: metric_info = Metric(metric_prefix, self.cluster, self.__pmda) metric_info.indom_cache = self.__indom_cache # # 'name' (required): Sanity check it and save it. # if 'name' not in item: self.log("Skipping array metric that has no name") del metric_info continue value = item['name'] if not isinstance(value, text_type): self.log("Invalid metadata 'name' value") raise TypeError("Invalid metadata 'name' value") try: metric_info.name = "%s.%s" % (array_name, value) except RuntimeError: # If we get an invalid metric name, just skip this # metric self.log("Skipping metric with invalid name '%s.%s'" % (array_name, value)) del metric_info continue # # 'type' (required): Sanity check it and save it. # if 'type' not in item: self.log("Skipping metric '%s' that has no type" % metric_info.name) del metric_info continue value = item['type'] if not isinstance(value, text_type): self.log("Invalid metadata 'type' value for metric '%s'" % metric_info.name) del metric_info continue if value == 'string': metric_info.type = c_api.PM_TYPE_STRING elif value == 'integer': metric_info.type = c_api.PM_TYPE_64 elif value == 'double': metric_info.type = c_api.PM_TYPE_DOUBLE elif value == 'array': # We don't allow arrays to have sub-arrays. self.log("Arrays can't contain arrays") raise TypeError("Arrays can't contain arrays") else: self.log("Type attribute has unknown value '%s'" % value) raise TypeError("Type attribute has unknown value '%s'" % value) # # 'pointer' (required): Sanity check it and save it. # if 'pointer' not in item: self.log("Skipping metric '%s' that has no pointer" % metric_info.name) del metric_info continue value = item['pointer'] if not isinstance(value, text_type): self.log("Invalid metadata 'pointer' value for metric '%s'" % metric_info.name) del metric_info continue metric_info.pointer = jsonpointer.JsonPointer(value) # # 'description' (optional): Type check it and save it. # if 'description' in item: value = item['description'] if not isinstance(value, text_type): self.log("Invalid schema 'description' value for" " metric '%s'" % metric_info.name) del metric_info continue metric_info.desc = value # # 'units' (optional): Type check and save it. # if 'units' in item: value = item['units'] if not isinstance(value, text_type): self.log("Invalid schema 'units' value for metric '%s'" % metric_info.name) del metric_info continue metric_info.units = value # # 'semantics' (optional): Type check and save it. # if 'semantics' in item: value = item['semantics'] if not isinstance(value, text_type): self.log("Invalid schema 'semantics' value for metric '%s'" % metric_info.name) del metric_info continue if value == "instant" or value == "instantaneous": metric_info.sem = c_api.PM_SEM_INSTANT elif value == "counter": metric_info.sem = c_api.PM_SEM_COUNTER elif value == "discrete": metric_info.sem = c_api.PM_SEM_DISCRETE else: del metric_info continue # # Silently ignore unknown key values. # # Try looking up the metric name in the metric cache. If # it is there, reuse the metric index. try: metric_info.idx \ = self.__metric_cache.lookup_name(metric_info.name) except KeyError: try: # We couldn't find the metric name, so just grab # the next metric index value. metric_info.idx = self.__metric_cache.next_value() self.__metric_cache.add_value(metric_info.name, metric_info.idx) except ValueError: self.log("Skipping metrics in '%s' - max metric reached" % metric_prefix) break # Make sure we have everything we need. If not, just skip # this metric. if not metric_info.valid() or metric_info.pointer == None: self.log("Metadata doesn't have required" " information for the following entry: %s" % metric_info.name) del metric_info continue # We have all the required information. Add the metric. if self.__pmda.debug: self.log("Adding metric '%s'" % metric_info.name) self.__add_metric(metric_info) def __preparse_metadata(self): ''' Go through the metadata, looking for information about the data source. ''' for (key, value) in iteritems(self.__metadata): # 'prefix' (optional): Sanity check it and save it. if key == 'prefix': if not isinstance(value, text_type): self.log("Invalid metadata 'prefix' value from file %s: %s" % (self.__path, value)) continue # If a source with this name already exists, we'll # catch it later. self.__root_name = value # 'data-path' (optional): Sanity check it and save it. elif key == 'data-path': if not isinstance(value, text_type): self.log("Invalid metadata 'data-path' value from" " file %s: %s" % (self.__path, value)) continue # We won't validate the path here. When we try to load # the JSON data, we'll error if necessary. self.__data_path = value # 'data-exec' (optional): Sanity check it and save it. elif key == 'data-exec': if not isinstance(value, text_type): self.log("Invalid metadata 'data-exec' value" " from file %s: %s" % (self.__path, value)) continue # There really isn't any validation we could do # here. If we run the command and it fails, we'll log # an error then. self.__data_exec = value # For everything else, just silently ignore # it. __parse_metadata will complain if needed. def __parse_metadata(self): ''' Go through the meta, looking for information we can use to create the pcp representation of the metadata. ''' # Make sure we've got a real cluster idx at this point. if self.__cluster < 0: raise TypeError("Cluster index must be set before parsing" " metadata to create metrics.") # Look for the "metrics" array. metrics_array = None for (key, value) in iteritems(self.__metadata): # 'metrics' (required): Save it. if key == 'metrics': metrics_array = value # The following optional items were handled above in # __preparse_metadata(): 'prefix', 'data-exec' elif key == 'prefix' or key == 'data-path' or key == 'data-exec': # Silently ignore these continue # For everything else, just ignore it. else: self.log("Ignoring unknown metadata attribute" " from file %s: %s" % (self.__path, key)) if not metrics_array: self.log("Metadata has no 'metrics' array in file %s" % self.__path) # If no metrics array, just skip this source. return # Process the metrics array. metric_prefix = "%s.%s" % (self.__pmda.pmda_name, self.__root_name) for item in metrics_array: metric_info = Metric(metric_prefix, self.__cluster, self.__pmda) # # 'name' (required): Sanity check it and save it. # if 'name' not in item: self.log("Skipping metric that has no name") del metric_info continue value = item['name'] if not isinstance(value, text_type): self.log("Skipping metric with invalid metadata " "'name' value") del metric_info continue try: metric_info.name = value except RuntimeError: # If we get an invalid metric name, just skip this metric self.log("Skipping metric with invalid name '%s'" % value) del metric_info continue # # 'type' (required): Sanity check it and save it. # if 'type' not in item: self.log("Skipping metric '%s' that has no type" % metric_info.name) del metric_info continue value = item['type'] if not isinstance(value, text_type): self.log("Invalid metadata 'type' value for metric '%s'" % metric_info.name) del metric_info continue if value == 'string': metric_info.type = c_api.PM_TYPE_STRING elif value == 'integer': metric_info.type = c_api.PM_TYPE_64 elif value == 'double': metric_info.type = c_api.PM_TYPE_DOUBLE elif value == 'array': # For arrays, we have to create metrics for # each subitem in the array, using the same # indom. This happens in the 'metrics' handling # below. metric_info.type = c_api.PM_TYPE_NOSUPPORT # If we get an invalid type value, just skip this # metric. else: self.log("Type attribute for metric '%s' has unknown value '%s'" % (metric_info.name, value)) del metric_info continue # # 'pointer' (required): Sanity check it and save it. # if 'pointer' not in item: self.log("Skipping metric '%s' that has no pointer" % metric_info.name) del metric_info continue value = item['pointer'] if not isinstance(value, text_type): self.log("Invalid metadata 'pointer' value for metric '%s'" % metric_info.name) del metric_info continue metric_info.pointer = jsonpointer.JsonPointer(value) # # 'description' (optional): Type check it and save it. # if 'description' in item: value = item['description'] if not isinstance(value, text_type): self.log("Invalid schema 'description' value for" " metric '%s'" % metric_info.name) del metric_info continue metric_info.desc = value # # 'index' (required for arrays): Type check and save it. # if 'index' in item: value = item['index'] if not isinstance(value, text_type): self.log("Invalid schema 'index' value for metric '%s'" % metric_info.name) del metric_info continue if metric_info.type != c_api.PM_TYPE_NOSUPPORT: self.log("Metadata has an 'index' item" " for non-array '%s'" % metric_info.name) del metric_info continue # If we're here, we're processing an array's metadata metric_info.index_pointer = jsonpointer.JsonPointer(value) elif metric_info.type == c_api.PM_TYPE_NOSUPPORT: self.log("Metadata doesn't have a required 'index' item" " for array '%s'" % metric_info.name) del metric_info continue # # 'metrics' (required for arrays): Process it. # if 'metrics' in item: value = item['metrics'] if metric_info.type != c_api.PM_TYPE_NOSUPPORT: self.log("Metadata has an 'metrics' item" " for non-array '%s'" % metric_info.name) del metric_info continue # If we're here, we're processing an array's # metadata. For arrays, we have to create metrics for # each subitem in the array, using the same indom. self.__parse_array_metadata(metric_info.name, value) elif metric_info.type == c_api.PM_TYPE_NOSUPPORT: self.log("Metadata doesn't have a required 'metrics' item" " for array '%s'" % metric_info.name) del metric_info continue # # 'units' (optional): Type check and save it. # if 'units' in item: value = item['units'] if not isinstance(value, text_type): self.log("Invalid schema 'units' value for metric '%s'" % metric_info.name) del metric_info continue metric_info.units = value # # 'semantics' (optional): Type check and save it. # if 'semantics' in item: value = item['semantics'] if not isinstance(value, text_type): self.log("Invalid schema 'semantics' value for metric '%s'" % metric_info.name) del metric_info continue if value == "instant" or value == "instantaneous": metric_info.sem = c_api.PM_SEM_INSTANT elif value == "counter": metric_info.sem = c_api.PM_SEM_COUNTER elif value == "discrete": metric_info.sem = c_api.PM_SEM_DISCRETE else: del metric_info continue # # Silently ignore unknown key values. # # Try looking up the metric name in the metric cache. If # it is there, reuse the metric index. try: metric_info.idx \ = self.__metric_cache.lookup_name(metric_info.name) except KeyError: try: # We couldn't find the metric name, so just grab # the next metric index value. metric_info.idx = self.__metric_cache.next_value() self.__metric_cache.add_value(metric_info.name, metric_info.idx) except ValueError: self.log("Skipping metrics in '%s' - max metric reached" % metric_prefix) break # Make sure we have everything we need. If not, just skip # this metric. if not metric_info.valid() or metric_info.pointer == None: self.log("Metadata doesn't have required" " information for the following entry: %s" % metric_info.name) del metric_info continue # We have all the required information. Add the metric. if self.__pmda.debug: self.log("Adding metric '%s'" % metric_info.name) self.__add_metric(metric_info) # Now that all the metrics are created, save the metric cache. self.__metric_cache.refresh() def fetch(self, item, inst): ''' Fetch value for this item and instance. ''' # If we need to, refresh data for this source. if self.__pmda.numfetch != self.__lastfetch: self.refresh_json_data() if item not in self.__metrics: self.log("JSON source '%s' has no item %d instance %d" % (self.__root_name, item, inst)) return [c_api.PM_ERR_PMID, 0] metric_info = self.__metrics[item] # Handle array metrics. if metric_info.indom_cache != None: # Split the full name into the array name and metric (array, dummy) = metric_info.name.split('.', 2) if array not in self.__metrics_by_name: self.log("JSON source '%s' has no item '%s'" % (self.__root_name, array)) return [c_api.PM_ERR_PMID, 0] array_info = self.__metrics_by_name[array] try: # Get the entire array. metrics_array = array_info.pointer.resolve(self.__json_data) # Turn the instance id into a name. name = self.__indom_cache.lookup_value(inst) # Using that name, lookup the array index where we # found it. full_name = "%s.%s" % (array_info.name, name) try: index = self.__array_indexes[full_name] except KeyError: # This is not a real error! Our saved indom cache # might list some ancient indom strings that don't # happen to be currently represented in the data. return [c_api.PM_ERR_INST, 0] return [metric_info.pointer.resolve(metrics_array[index]), 1] except (KeyError, TypeError): self.log("Error while fetching metrics for array %s" % array_info.name) self.log("%s" % traceback.format_exc()) # Handle single-valued metrics. else: try: return [metric_info.pointer.resolve(self.__json_data), 1] except (KeyError, TypeError): self.log("Error while fetching metric %s" % metric_info.name) self.log("%s" % traceback.format_exc()) self.log("JSON source %s couldn't fetch value for item %d instance %d" % (self.__root_name, item, inst)) return [c_api.PM_ERR_TYPE, 0] def refresh_metrics(self): ''' Refresh metrics by re-adding all metrics for this JSON source to the PMDA. ''' for (dummy, metric) in iteritems(self.__metrics): # Skip array metrics. if metric.type != c_api.PM_TYPE_NOSUPPORT: self.__pmda.add_metric(metric.full_name, metric.obj, metric.desc) class JsonPMDA(PMDA): ''' JSON PMDA class ''' def __init__(self, pmda_name, domain): self.pmda_name = pmda_name PMDA.__init__(self, self.pmda_name, domain) self.connect_pmcd() self.__cluster_indom = None self.numfetch = 0 self.metadata_name = 'metadata.json' # cache_idx 0 is reserved for the cluster cache. cluster 0 is # reserved for the static metrics, so clusters that get added # to the cache start with 1. self.__cluster_cache = IndomCache(0, MAX_CLUSTER, self) # The pcp python support doesn't have a pmSetDebug() # wrapper. So, if PCP_PYTHON_DEBUG has any value, turn # debugging on. self.debug = ('PCP_PYTHON_DEBUG' in os.environ) # Try loading old cluster cache values. self.__cluster_cache.load() if self.__cluster_cache.len() == 0: # If there weren't any old cluster cache values, we've got # a bit of a problem. The indom cache only allocates # consecutive values starting at 0. We want to start at 1, # since cluster 0 is reserved for the static metrics. So, # let's add a fake entry. self.__cluster_cache.add_value('__internal__', 0) self.__metrics = {} self.__add_static_metrics() # Set up defaults for config variables. self.__directory_list = [] self.__trusted_directory_list = [] # Load config file and process config items. self.__configfile = ("%s/%s/config.json" % (pmContext.pmGetConfig('PCP_PMDAS_DIR'), pmda_name)) self.__config_data = {} self.__load_config_file() for (key, value) in iteritems(self.__config_data): if key == 'directory_list': if not isinstance(value, list): self.log("Invalid config file 'directory_list' value") continue self.__directory_list = value elif key == 'trusted_directory_list': if not isinstance(value, list): self.log("Invalid config file 'trusted_directory_list'" " value") continue self.__trusted_directory_list = value # For everything else, just ignore it. else: self.log("Ignoring unknown config option '%s'" % key) # Load all the metadata files and json data. self.sources_by_name = {} self.sources_by_root = {} self.sources_by_cluster = {} self.__load_all_json() self.set_refresh_metrics(self.__refresh_metrics) self.set_fetch_callback(self.__fetch_callback) self.set_fetch(self.__fetch) if self.debug: self.log("__init__ finished") def __load_config_file(self): ''' Load config file. ''' self.__config_data = {} try: fobj = open(self.__configfile) except IOError: self.log("Couldn't open JSON config file '%s'" % self.__configfile) self.log("%s" % traceback.format_exc()) return try: self.__config_data = json.load(fobj) except ValueError: self.log("Couldn't parse JSON config file") self.log("%s" % traceback.format_exc()) fobj.close() def __add_static_metrics(self): ''' Create all the static metrics (not from a JSON source). ''' # Create our 'nsources' metric. metric_info = Metric(self.pmda_name, 0, self) metric_info.name = 'nsources' metric_info.type = c_api.PM_TYPE_64 metric_info.desc = 'Number of JSON sources' metric_info.idx = 0 metric_info.create() self.add_metric(metric_info.full_name, metric_info.obj, metric_info.desc) self.__metrics[metric_info.idx] = metric_info # Create our 'debug' metric. metric_info = Metric(self.pmda_name, 0, self) metric_info.name = 'debug' metric_info.type = c_api.PM_TYPE_64 metric_info.desc = 'Debug logging state' metric_info.idx = 1 metric_info.create() self.add_metric(metric_info.full_name, metric_info.obj, metric_info.desc) self.__metrics[metric_info.idx] = metric_info def __remove_json_sources(self, removed_sources): ''' Clean up a list of removed JSON sources. ''' if len(removed_sources): for root in removed_sources: if self.debug: self.log("Removing JSON source '%s'" % os.path.basename(root)) self.sources_by_root[root].cleanup() cluster = self.sources_by_root[root].cluster name = self.sources_by_root[root].name del self.sources_by_root[root] del self.sources_by_name[name] del self.sources_by_cluster[cluster] def __valid_perms(self, path, desc): ''' Check path and make sure it is owned by root and isn't group or world writable. ''' try: stat_result = os.stat(path) except OSError: self.log("Error while getting information about" " %s '%s'" % (desc, path)) self.log("%s" % traceback.format_exc()) return False if stat_result.st_uid != 0 \ or (stat_result.st_mode & (stat.S_IWGRP|stat.S_IWOTH)) != 0: self.log("The %s '%s' must be owned by root and" " not group or world-writable." % (desc, path)) return False return True def __load_directory_list(self, trusted, sources_seen): ''' Load the JSON metadata/data for new sources found in either the 'trusted_directory_list' or the 'directory_list'. ''' if trusted: dir_list = self.__trusted_directory_list else: dir_list = self.__directory_list new_source_seen = False for directory in dir_list: for root, dummy, files in os.walk(directory): # Make sure we have the metadata file. if self.metadata_name in files: # If we haven't seen this source before... if root not in self.sources_by_root: # If we're processing a "trusted" directory, make # sure the root directory and metadata file are # owned by root and aren't group or # world-writable. metadata_path = "%s/%s" % (root, self.metadata_name) if trusted \ and (not self.__valid_perms(root, "directory") or not self.__valid_perms(metadata_path, "file")): continue # Create the new JsonSource. After the # JsonSource has been initialized, it has # parsed the metadata enough to optionally # change the name. source = JsonSource(root, self, trusted) # If we've already got a source with this # name, skip this one. if source.name in self.sources_by_name: self.log("Skipping source '%s' (%s) -" " already have a source with" " the same name (%s)" % (source.name, source.path, self.sources_by_name[source.name].path)) continue # Try looking up the source name in the # cluster cache. try: cluster_idx \ = self.__cluster_cache.lookup_name(source.name) if self.debug: self.log("Found %s in cluster cache: %d" % (source.name, cluster_idx)) except KeyError: try: cluster_idx = self.__cluster_cache.next_value() if self.debug: self.log("allocating new cluster idx" " %d for source %s" % (cluster_idx, source.name)) except ValueError: self.log("Skipping source '%s' -" " max cluster reached" % root) continue if self.debug: self.log("Adding source '%s', cluster_idx %d" % (source.name, cluster_idx)) self.__cluster_cache.add_value(source.name, cluster_idx) source.cluster = cluster_idx self.sources_by_name[source.name] = source self.sources_by_root[root] = source self.sources_by_cluster[cluster_idx] = source # Notice we're going ahead and loading the # JSON data for new sources. If we're # fetching, this fetch might not be for this # new data source, but we need the metadata # (for metrics) and data (for indoms). self.sources_by_root[root].load() new_source_seen = True sources_seen[root] = 1 if new_source_seen: self.__cluster_cache.refresh() def __load_all_json(self): ''' Walk the filesystem and load the JSON metadata/data for every JSON source found. ''' if self.debug: self.log("load_all_json entry") sources_seen = {} self.__load_directory_list(True, sources_seen) self.__load_directory_list(False, sources_seen) if self.debug: self.log("load_all_json exit") def __fetch(self): ''' Called once per "fetch" PDU, before callbacks. ''' # Remember how many fetches we've seen. self.numfetch += 1 def __refresh_metrics(self): ''' Called before callbacks. This allows us to update the list of metrics if needed. ''' if self.debug: self.log("__refresh_metrics: entry") # Update our list of sources. sources_seen = {} self.__load_directory_list(True, sources_seen) self.__load_directory_list(False, sources_seen) # Cleanup all removed JSON sources. removed_sources = [k for k in self.sources_by_root \ if k not in sources_seen] if len(removed_sources) > 0: if self.debug: self.log("__refresh_metrics: removed JSON sources found") self.__remove_json_sources(removed_sources) # If we've removed a JSON source, we need to recreate # the metrics from scratch, since you can't remove one # metric, you have to remove them all. if self.debug: self.log("__refresh_metrics: clearing/recreating metrics") # First clear out all existing metrics. self.clear_metrics() # Recreate our static metrics. for (dummy, metric) in iteritems(self.__metrics): self.add_metric(metric.full_name, metric.obj, metric.desc) # Now ask each JSON source to refresh its own # metrics. This recreates them from cached info we got # from the last time we read the source's metadata file. for (dummy, source) in iteritems(self.sources_by_root): source.refresh_metrics() if self.debug: self.log("__refresh_metrics: exit") def __fetch_callback(self, cluster, item, inst): ''' Main fetch callback. Returns a list of value,status (single pair) for requested pmid/inst. ''' if self.debug: self.log("**** fetch_callback: %d, %d, %d ****" % (cluster, item, inst)) if cluster not in self.sources_by_cluster: # Handle our static metrics. if cluster == 0: if item == 0: return [len(self.sources_by_cluster), 1] elif item == 1: return [self.debug, 1] if self.debug: self.log("Invalid cluster %d" % cluster) return [c_api.PM_ERR_PMID, 0] source = self.sources_by_cluster[cluster] return source.fetch(item, inst) if __name__ == '__main__': # Find the uid/gid of "nobody" for use by non-trusted "data-exec" # commands. try: PW_RECORD = pwd.getpwnam('nobody') NOBODY_UID = PW_RECORD.pw_uid NOBODY_GID = PW_RECORD.pw_gid except KeyError: # We can't log errors yet since the JsonPMDA isn't created # yet. But, we'll log an error when we try to use the # uid/gid. pass #os.environ["PCP_PYTHON_DEBUG"] = "ALL" #os.environ["PCP_PYTHON_DEBUG"] = "APPL0|LIBPMDA" #os.environ["PCP_PYTHON_DEBUG"] = "INDOM" JsonPMDA('json', 137).run()