????JFIF??x?x????'
| Server IP : 172.67.174.47  /  Your IP : 216.73.216.145 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /././././usr/lib/python3.6/site-packages/slip/dbus/ | 
| Upload File : | 
# -*- coding: utf-8 -*-
# slip.dbus.service -- convenience functions for using dbus-activated
# services
#
# Copyright © 2008, 2009, 2015 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Nils Philippsen <nils@redhat.com>
"This module contains convenience functions for using dbus-activated services."
from __future__ import absolute_import
import dbus
import dbus.service
from six import with_metaclass
from .._wrappers import _glib as GLib
from . import polkit
__all__ = ["Object", "InterfaceType", "set_mainloop"]
__mainloop__ = None
def __glib_quit_cb__():
    global __mainloop__
    # assume a Glib mainloop
    __mainloop__.quit()
__quit_cb__ = __glib_quit_cb__
def set_mainloop(mainloop):
    global __mainloop__
    __mainloop__ = mainloop
def set_quit_cb(quit_cb):
    global __quit_cb__
    __quit_cb__ = quit_cb
def quit_cb():
    global __quit_cb__
    __quit_cb__()
SENDER_KEYWORD = "__slip_dbus_service_sender__"
ASYNC_CALLBACKS = ("__slip_dbus_service_reply_cb__",
                   "__slip_dbus_service_error_cb__")
def wrap_method(method):
    global SENDER_KEYWORD
    global ASYNC_CALLBACKS
    if method._dbus_sender_keyword is not None:
        sender_keyword = method._dbus_sender_keyword
        hide_sender_keyword = False
    else:
        sender_keyword = SENDER_KEYWORD
        hide_sender_keyword = True
    if method._dbus_async_callbacks is not None:
        async_callbacks = method._dbus_async_callbacks
        method_is_async = True
    else:
        async_callbacks = ASYNC_CALLBACKS
        method_is_async = False
    hide_async_callbacks = not method_is_async
    def wrapped_method(self, *p, **k):
        sender = k.get(sender_keyword)
        if sender is not None:
            # i.e. called over the bus, not locally
            reply_cb = k[async_callbacks[0]]
            error_cb = k[async_callbacks[1]]
            if hide_sender_keyword:
                del k[sender_keyword]
            if hide_async_callbacks:
                del k[async_callbacks[0]]
                del k[async_callbacks[1]]
            self.sender_seen(sender)
        action_id = getattr(method, "_slip_polkit_auth_required",
                            getattr(self, "default_polkit_auth_required",
                                    None))
        if sender is not None and action_id:
            def reply_handler(is_auth):
                if is_auth:
                    if method_is_async:
                        # k contains async callbacks, simply pass on reply_cb
                        # and error_cb
                        method(self, *p, **k)
                    else:
                        # execute the synchronous method ...
                        error = None
                        try:
                            result = method(self, *p, **k)
                        except Exception as e:
                            error = e
                        # ... and call the reply or error callback
                        if error:
                            error_cb(error)
                        else:
                            # reply_cb((None,)) != reply_cb()
                            if result is None:
                                reply_cb()
                            else:
                                reply_cb(result)
                else:
                    error_cb(polkit.NotAuthorizedException(action_id))
                self.timeout_restart()
            def error_handler(error):
                error_cb(error)
                self.timeout_restart()
            polkit.IsSystemBusNameAuthorizedAsync(
                sender, action_id,
                reply_handler=reply_handler, error_handler=error_handler)
        else:
            # no action id, or run locally, no need to do anything fancy
            retval = method(self, *p, **k)
            self.timeout_restart()
            return retval
    for attr in (x for x in dir(method) if x[:6] == "_dbus_"):
        if attr == "_dbus_sender_keyword":
            wrapped_method._dbus_sender_keyword = sender_keyword
        elif attr == "_dbus_async_callbacks":
            wrapped_method._dbus_async_callbacks = async_callbacks
        else:
            setattr(wrapped_method, attr, getattr(method, attr))
        # delattr (method, attr)
    wrapped_method.__name__ = method.__name__
    return wrapped_method
class InterfaceType(dbus.service.InterfaceType):
    def __new__(cls, name, bases, dct):
        for (attrname, attr) in dct.items():
            if getattr(attr, "_dbus_is_method", False):
                dct[attrname] = wrap_method(attr)
        return super(InterfaceType, cls).__new__(cls, name, bases, dct)
class Object(with_metaclass(InterfaceType, dbus.service.Object)):
    # timeout & persistence
    persistent = False
    default_duration = 5
    duration = default_duration
    current_source = None
    senders = set()
    connections_senders = {}
    connections_smobjs = {}
    # PolicyKit
    default_polkit_auth_required = None
    def __init__(
        self, conn=None, object_path=None, bus_name=None, persistent=None):
        super(Object, self).__init__(conn, object_path, bus_name)
        if persistent is None:
            self.persistent = self.__class__.persistent
        else:
            self.persistent = persistent
    def _timeout_cb(self):
        if not self.persistent and len(Object.senders) == 0:
            quit_cb()
            return False
        Object.current_source = None
        Object.duration = self.default_duration
        return False
    def _name_owner_changed(self, name, old_owner, new_owner):
        conn = self.connection
        if not new_owner and (old_owner, conn) in Object.senders:
            Object.senders.remove((old_owner, conn))
            Object.connections_senders[conn].remove(old_owner)
            if len(Object.connections_senders[conn]) == 0:
                Object.connections_smobjs[conn].remove()
                del Object.connections_senders[conn]
                del Object.connections_smobjs[conn]
            if not self.persistent and len(Object.senders) == 0 and \
                    Object.current_source is None:
                quit_cb()
    def timeout_restart(self, duration=None):
        if not duration:
            duration = self.__class__.default_duration
        if not Object.duration or duration > Object.duration:
            Object.duration = duration
        if not self.persistent or len(Object.senders) == 0:
            if Object.current_source:
                GLib.source_remove(Object.current_source)
            Object.current_source = \
                GLib.timeout_add(Object.duration * 1000,
                                    self._timeout_cb)
    def sender_seen(self, sender):
        if (sender, self.connection) not in Object.senders:
            Object.senders.add((sender, self.connection))
            if self.connection not in Object.connections_senders:
                Object.connections_senders[self.connection] = set()
                Object.connections_smobjs[self.connection] = \
                    self.connection.add_signal_receiver(
                        handler_function=self._name_owner_changed,
                        signal_name='NameOwnerChanged',
                        dbus_interface='org.freedesktop.DBus',
                        arg1=sender)
            Object.connections_senders[self.connection].add(sender)