Server IP : 184.154.167.98 / Your IP : 3.142.54.169 Web Server : Apache System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : puertode ( 1767) PHP Version : 7.2.34 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /lib/python3.6/site-packages/pydbus/ |
Upload File : |
from __future__ import print_function import sys, traceback from gi.repository import GLib, Gio from . import generic from .exitable import ExitableWithAliases from functools import partial from .method_call_context import MethodCallContext from .error import error_registration import logging try: from inspect import signature, Parameter except: from ._inspect3 import signature, Parameter class ObjectWrapper(ExitableWithAliases("unwrap")): __slots__ = ["object", "outargs", "readable_properties", "writable_properties"] def __init__(self, object, interfaces): self.object = object self.outargs = {} for iface in interfaces: for method in iface.methods: self.outargs[iface.name + "." + method.name] = [arg.signature for arg in method.out_args] self.readable_properties = {} self.writable_properties = {} for iface in interfaces: for prop in iface.properties: if prop.flags & Gio.DBusPropertyInfoFlags.READABLE: self.readable_properties[iface.name + "." + prop.name] = prop.signature if prop.flags & Gio.DBusPropertyInfoFlags.WRITABLE: self.writable_properties[iface.name + "." + prop.name] = prop.signature for iface in interfaces: for signal in iface.signals: s_name = signal.name def EmitSignal(iface, signal): return lambda *args: self.SignalEmitted(iface.name, signal.name, GLib.Variant("(" + "".join(s.signature for s in signal.args) + ")", args)) self._at_exit(getattr(object, signal.name).connect(EmitSignal(iface, signal)).__exit__) if "org.freedesktop.DBus.Properties" not in (iface.name for iface in interfaces): try: def onPropertiesChanged(iface, changed, invalidated): changed = {key: GLib.Variant(self.readable_properties[iface + "." + key], val) for key, val in changed.items()} args = GLib.Variant("(sa{sv}as)", (iface, changed, invalidated)) self.SignalEmitted("org.freedesktop.DBus.Properties", "PropertiesChanged", args) self._at_exit(object.PropertiesChanged.connect(onPropertiesChanged).__exit__) except AttributeError: pass SignalEmitted = generic.signal() def call_method(self, connection, sender, object_path, interface_name, method_name, parameters, invocation): try: try: outargs = self.outargs[interface_name + "." + method_name] method = getattr(self.object, method_name) except KeyError: if interface_name == "org.freedesktop.DBus.Properties": if method_name == "Get": method = self.Get outargs = ["v"] elif method_name == "GetAll": method = self.GetAll outargs = ["a{sv}"] elif method_name == "Set": method = self.Set outargs = [] else: raise else: raise sig = signature(method) kwargs = {} if "dbus_context" in sig.parameters and sig.parameters["dbus_context"].kind in (Parameter.POSITIONAL_OR_KEYWORD, Parameter.KEYWORD_ONLY): kwargs["dbus_context"] = MethodCallContext(invocation) result = method(*parameters, **kwargs) if len(outargs) == 0: invocation.return_value(None) elif len(outargs) == 1: invocation.return_value(GLib.Variant("(" + "".join(outargs) + ")", (result,))) else: invocation.return_value(GLib.Variant("(" + "".join(outargs) + ")", result)) except Exception as e: logger = logging.getLogger(__name__) logger.exception("Exception while handling %s.%s()", interface_name, method_name) if error_registration.is_registered_exception(e): name = error_registration.get_dbus_name(e) invocation.return_dbus_error(name, str(e)) else: logger.info("name is not registered") e_type = type(e).__name__ if not "." in e_type: e_type = "unknown." + e_type invocation.return_dbus_error(e_type, str(e)) def Get(self, interface_name, property_name): type = self.readable_properties[interface_name + "." + property_name] result = getattr(self.object, property_name) return GLib.Variant(type, result) def GetAll(self, interface_name): ret = {} for name, type in self.readable_properties.items(): ns, local = name.rsplit(".", 1) if ns == interface_name: ret[local] = GLib.Variant(type, getattr(self.object, local)) return ret def Set(self, interface_name, property_name, value): self.writable_properties[interface_name + "." + property_name] setattr(self.object, property_name, value) class ObjectRegistration(ExitableWithAliases("unregister")): __slots__ = () def __init__(self, bus, path, interfaces, wrapper, own_wrapper=False): if own_wrapper: self._at_exit(wrapper.__exit__) def func(interface_name, signal_name, parameters): bus.con.emit_signal(None, path, interface_name, signal_name, parameters) self._at_exit(wrapper.SignalEmitted.connect(func).__exit__) try: ids = [bus.con.register_object(path, interface, wrapper.call_method, None, None) for interface in interfaces] except TypeError as e: if str(e).startswith("argument vtable: Expected Gio.DBusInterfaceVTable"): raise Exception("GLib 2.46 is required to publish objects; it is impossible in older versions.") else: raise self._at_exit(lambda: [bus.con.unregister_object(id) for id in ids]) class RegistrationMixin: __slots__ = () def register_object(self, path, object, node_info): if node_info is None: try: node_info = type(object).dbus except AttributeError: node_info = type(object).__doc__ if type(node_info) != list and type(node_info) != tuple: node_info = [node_info] node_info = [Gio.DBusNodeInfo.new_for_xml(ni) for ni in node_info] interfaces = sum((ni.interfaces for ni in node_info), []) wrapper = ObjectWrapper(object, interfaces) return ObjectRegistration(self, path, interfaces, wrapper, own_wrapper=True)