/proc/self/root/proc/thread-self/root/lib/python3.9/site-packages/dnf
This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.
# util.py# Basic dnf utils.## Copyright (C) 2012-2016 Red Hat, Inc.## This copyrighted material is made available to anyone wishing to use,# modify, copy, or redistribute it subject to the terms and conditions of# the GNU General Public License v.2, or (at your option) any later version.# This program is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY expressed or implied, including the implied warranties of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General# Public License for more details. You should have received a copy of the# GNU General Public License along with this program; if not, write to the# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the# source code or documentation are not subject to the GNU General Public# License and may only be used or replicated with the express permission of# Red Hat, Inc.# from __future__ import print_functionfrom __future__ import absolute_importfrom __future__ import unicode_literals from .pycomp import PY3, basestringfrom dnf.i18n import _, ucdimport argparseimport dnfimport dnf.callbackimport dnf.constimport dnf.pycompimport errnoimport functoolsimport hawkeyimport itertoolsimport localeimport loggingimport osimport pwdimport shutilimport sysimport tempfileimport timeimport libdnf.repoimport libdnf.transaction logger = logging.getLogger('dnf') MAIN_PROG = argparse.ArgumentParser().prog if argparse.ArgumentParser().prog == "yum" else "dnf"MAIN_PROG_UPPER = MAIN_PROG.upper() """DNF Utilities.""" def _parse_specs(namespace, values): """ Categorize :param values list into packages, groups and filenames :param namespace: argparse.Namespace, where specs will be stored :param values: list of specs, whether packages ('foo') or groups/modules ('@bar') or filenames ('*.rmp', 'http://*', ...) To access packages use: specs.pkg_specs, to access groups use: specs.grp_specs, to access filenames use: specs.filenames """ setattr(namespace, "filenames", []) setattr(namespace, "grp_specs", []) setattr(namespace, "pkg_specs", []) tmp_set = set() for value in values: if value in tmp_set: continue tmp_set.add(value) schemes = dnf.pycomp.urlparse.urlparse(value)[0] if value.endswith('.rpm'): namespace.filenames.append(value) elif schemes and schemes in ('http', 'ftp', 'file', 'https'): namespace.filenames.append(value) elif value.startswith('@'): namespace.grp_specs.append(value[1:]) else: namespace.pkg_specs.append(value) def _urlopen_progress(url, conf, progress=None): if progress is None: progress = dnf.callback.NullDownloadProgress() pload = dnf.repo.RemoteRPMPayload(url, conf, progress) est_remote_size = sum([pload.download_size]) progress.start(1, est_remote_size) targets = [pload._librepo_target()] try: libdnf.repo.PackageTarget.downloadPackages(libdnf.repo.VectorPPackageTarget(targets), True) except RuntimeError as e: if conf.strict: raise IOError(str(e)) logger.error(str(e)) return pload.local_path def _urlopen(url, conf=None, repo=None, mode='w+b', **kwargs): """ Open the specified absolute url, return a file object which respects proxy setting even for non-repo downloads """ if PY3 and 'b' not in mode: kwargs.setdefault('encoding', 'utf-8') fo = tempfile.NamedTemporaryFile(mode, **kwargs) try: if repo: repo._repo.downloadUrl(url, fo.fileno()) else: libdnf.repo.Downloader.downloadURL(conf._config if conf else None, url, fo.fileno()) except RuntimeError as e: raise IOError(str(e)) fo.seek(0) return fo def rtrim(s, r): if s.endswith(r): s = s[:-len(r)] return s def am_i_root(): # used by ansible (lib/ansible/modules/packaging/os/dnf.py) return os.geteuid() == 0 def clear_dir(path): """Remove all files and dirs under `path` Also see rm_rf() """ for entry in os.listdir(path): contained_path = os.path.join(path, entry) rm_rf(contained_path) def ensure_dir(dname): # used by ansible (lib/ansible/modules/packaging/os/dnf.py) try: os.makedirs(dname, mode=0o755) except OSError as e: if e.errno != errno.EEXIST or not os.path.isdir(dname): raise e def split_path(path): """ Split path by path separators. Use os.path.join() to join the path back to string. """ result = [] head = path while True: head, tail = os.path.split(head) if not tail: if head or not result: # if not result: make sure result is [""] so os.path.join(*result) can be called result.insert(0, head) break result.insert(0, tail) return result def empty(iterable): try: l = len(iterable) except TypeError: l = len(list(iterable)) return l == 0 def first(iterable): """Returns the first item from an iterable or None if it has no elements.""" it = iter(iterable) try: return next(it) except StopIteration: return None def first_not_none(iterable): it = iter(iterable) try: return next(item for item in it if item is not None) except StopIteration: return None def file_age(fn): return time.time() - file_timestamp(fn) def file_timestamp(fn): return os.stat(fn).st_mtime def get_effective_login(): try: return pwd.getpwuid(os.geteuid())[0] except KeyError: return "UID: %s" % os.geteuid() def get_in(dct, keys, not_found): """Like dict.get() for nested dicts.""" for k in keys: dct = dct.get(k) if dct is None: return not_found return dct def group_by_filter(fn, iterable): def splitter(acc, item): acc[not bool(fn(item))].append(item) return acc return functools.reduce(splitter, iterable, ([], [])) def insert_if(item, iterable, condition): """Insert an item into an iterable by a condition.""" for original_item in iterable: if condition(original_item): yield item yield original_item def is_exhausted(iterator): """Test whether an iterator is exhausted.""" try: next(iterator) except StopIteration: return True else: return False def is_glob_pattern(pattern): if is_string_type(pattern): pattern = [pattern] return (isinstance(pattern, list) and any(set(p) & set("*[?") for p in pattern)) def is_string_type(obj): if PY3: return isinstance(obj, str) else: return isinstance(obj, basestring) def lazyattr(attrname): """Decorator to get lazy attribute initialization. Composes with @property. Force reinitialization by deleting the <attrname>. """ def get_decorated(fn): def cached_getter(obj): try: return getattr(obj, attrname) except AttributeError: val = fn(obj) setattr(obj, attrname, val) return val return cached_getter return get_decorated def mapall(fn, *seq): """Like functools.map(), but return a list instead of an iterator. This means all side effects of fn take place even without iterating the result. """ return list(map(fn, *seq)) def normalize_time(timestamp): """Convert time into locale aware datetime string object.""" t = time.strftime("%c", time.localtime(timestamp)) if not dnf.pycomp.PY3: current_locale_setting = locale.getlocale()[1] if current_locale_setting: t = t.decode(current_locale_setting) return t def on_ac_power(): """Decide whether we are on line power. Returns True if we are on line power, False if not, None if it can not be decided. """ try: ps_folder = "/sys/class/power_supply" ac_nodes = [node for node in os.listdir(ps_folder) if node.startswith("AC")] if len(ac_nodes) > 0: ac_node = ac_nodes[0] with open("{}/{}/online".format(ps_folder, ac_node)) as ac_status: data = ac_status.read() return int(data) == 1 return None except (IOError, ValueError): return None def on_metered_connection(): """Decide whether we are on metered connection. Returns: True: if on metered connection False: if not None: if it can not be decided """ try: import dbus except ImportError: return None try: bus = dbus.SystemBus() proxy = bus.get_object("org.freedesktop.NetworkManager", "/org/freedesktop/NetworkManager") iface = dbus.Interface(proxy, "org.freedesktop.DBus.Properties") metered = iface.Get("org.freedesktop.NetworkManager", "Metered") except dbus.DBusException: return None if metered == 0: # NM_METERED_UNKNOWN return None elif metered in (1, 3): # NM_METERED_YES, NM_METERED_GUESS_YES return True elif metered in (2, 4): # NM_METERED_NO, NM_METERED_GUESS_NO return False else: # Something undocumented (at least at this moment) raise ValueError("Unknown value for metered property: %r", metered) def partition(pred, iterable): """Use a predicate to partition entries into false entries and true entries. Credit: Python library itertools' documentation. """ t1, t2 = itertools.tee(iterable) return dnf.pycomp.filterfalse(pred, t1), filter(pred, t2) def rm_rf(path): try: shutil.rmtree(path) except OSError: pass def split_by(iterable, condition): """Split an iterable into tuples by a condition. Inserts a separator before each item which meets the condition and then cuts the iterable by these separators. """ separator = object() # A unique object. # Create a function returning tuple of objects before the separator. def next_subsequence(it): return tuple(itertools.takewhile(lambda e: e != separator, it)) # Mark each place where the condition is met by the separator. marked = insert_if(separator, iterable, condition) # The 1st subsequence may be empty if the 1st item meets the condition. yield next_subsequence(marked) while True: subsequence = next_subsequence(marked) if not subsequence: break yield subsequence def strip_prefix(s, prefix): if s.startswith(prefix): return s[len(prefix):] return None def touch(path, no_create=False): """Create an empty file if it doesn't exist or bump it's timestamps. If no_create is True only bumps the timestamps. """ if no_create or os.access(path, os.F_OK): return os.utime(path, None) with open(path, 'a'): pass def _terminal_messenger(tp='write', msg="", out=sys.stdout): try: if tp == 'write': out.write(msg) elif tp == 'flush': out.flush() elif tp == 'write_flush': out.write(msg) out.flush() elif tp == 'print': print(msg, file=out) else: raise ValueError('Unsupported type: ' + tp) except IOError as e: logger.critical('{}: {}'.format(type(e).__name__, ucd(e))) pass def _format_resolve_problems(resolve_problems): """ Format string about problems in resolve :param resolve_problems: list with list of strings (output of goal.problem_rules()) :return: string """ msg = "" count_problems = (len(resolve_problems) > 1) for i, rs in enumerate(resolve_problems, start=1): if count_problems: msg += "\n " + _("Problem") + " %d: " % i else: msg += "\n " + _("Problem") + ": " msg += "\n - ".join(rs) return msg def _te_nevra(te): nevra = te.N() + '-' if te.E() is not None and te.E() != '0': nevra += te.E() + ':' return nevra + te.V() + '-' + te.R() + '.' + te.A() def _log_rpm_trans_with_swdb(rpm_transaction, swdb_transaction): logger.debug("Logging transaction elements") for rpm_el in rpm_transaction: tsi = rpm_el.Key() tsi_state = None if tsi is not None: tsi_state = tsi.state msg = "RPM element: '{}', Key(): '{}', Key state: '{}', Failed() '{}': ".format( _te_nevra(rpm_el), tsi, tsi_state, rpm_el.Failed()) logger.debug(msg) for tsi in swdb_transaction: msg = "SWDB element: '{}', State: '{}', Action: '{}', From repo: '{}', Reason: '{}', " \ "Get reason: '{}'".format(str(tsi), tsi.state, tsi.action, tsi.from_repo, tsi.reason, tsi.get_reason()) logger.debug(msg) def _sync_rpm_trans_with_swdb(rpm_transaction, swdb_transaction): revert_actions = {libdnf.transaction.TransactionItemAction_DOWNGRADED, libdnf.transaction.TransactionItemAction_OBSOLETED, libdnf.transaction.TransactionItemAction_REMOVE, libdnf.transaction.TransactionItemAction_UPGRADED, libdnf.transaction.TransactionItemAction_REINSTALLED} cached_tsi = [tsi for tsi in swdb_transaction] el_not_found = False error = False for rpm_el in rpm_transaction: te_nevra = _te_nevra(rpm_el) tsi = rpm_el.Key() if tsi is None or not hasattr(tsi, "pkg"): for tsi_candidate in cached_tsi: if tsi_candidate.state != libdnf.transaction.TransactionItemState_UNKNOWN: continue if tsi_candidate.action not in revert_actions: continue if str(tsi_candidate) == te_nevra: tsi = tsi_candidate break if tsi is None or not hasattr(tsi, "pkg"): logger.critical(_("TransactionItem not found for key: {}").format(te_nevra)) el_not_found = True continue if rpm_el.Failed(): tsi.state = libdnf.transaction.TransactionItemState_ERROR error = True else: tsi.state = libdnf.transaction.TransactionItemState_DONE for tsi in cached_tsi: if tsi.state == libdnf.transaction.TransactionItemState_UNKNOWN: logger.critical(_("TransactionSWDBItem not found for key: {}").format(str(tsi))) el_not_found = True if error: logger.debug(_('Errors occurred during transaction.')) if el_not_found: _log_rpm_trans_with_swdb(rpm_transaction, cached_tsi) class tmpdir(object): # used by subscription-manager (src/dnf-plugins/product-id.py) def __init__(self): prefix = '%s-' % dnf.const.PREFIX self.path = tempfile.mkdtemp(prefix=prefix) def __enter__(self): return self.path def __exit__(self, exc_type, exc_value, traceback): rm_rf(self.path) class Bunch(dict): """Dictionary with attribute accessing syntax. In DNF, prefer using this over dnf.yum.misc.GenericHolder. Credit: Alex Martelli, Doug Hudgeon """ def __init__(self, *args, **kwds): super(Bunch, self).__init__(*args, **kwds) self.__dict__ = self def __hash__(self): return id(self) class MultiCallList(list): def __init__(self, iterable): super(MultiCallList, self).__init__() self.extend(iterable) def __getattr__(self, what): def fn(*args, **kwargs): def call_what(v): method = getattr(v, what) return method(*args, **kwargs) return list(map(call_what, self)) return fn def __setattr__(self, what, val): def setter(item): setattr(item, what, val) return list(map(setter, self)) def _make_lists(transaction): b = Bunch({ 'downgraded': [], 'erased': [], 'erased_clean': [], 'erased_dep': [], 'installed': [], 'installed_group': [], 'installed_dep': [], 'installed_weak': [], 'reinstalled': [], 'upgraded': [], 'failed': [], }) for tsi in transaction: if tsi.state == libdnf.transaction.TransactionItemState_ERROR: b.failed.append(tsi) elif tsi.action == libdnf.transaction.TransactionItemAction_DOWNGRADE: b.downgraded.append(tsi) elif tsi.action == libdnf.transaction.TransactionItemAction_INSTALL: if tsi.reason == libdnf.transaction.TransactionItemReason_GROUP: b.installed_group.append(tsi) elif tsi.reason == libdnf.transaction.TransactionItemReason_DEPENDENCY: b.installed_dep.append(tsi) elif tsi.reason == libdnf.transaction.TransactionItemReason_WEAK_DEPENDENCY: b.installed_weak.append(tsi) else: # TransactionItemReason_USER b.installed.append(tsi) elif tsi.action == libdnf.transaction.TransactionItemAction_REINSTALL: b.reinstalled.append(tsi) elif tsi.action == libdnf.transaction.TransactionItemAction_REMOVE: if tsi.reason == libdnf.transaction.TransactionItemReason_CLEAN: b.erased_clean.append(tsi) elif tsi.reason == libdnf.transaction.TransactionItemReason_DEPENDENCY: b.erased_dep.append(tsi) else: b.erased.append(tsi) elif tsi.action == libdnf.transaction.TransactionItemAction_UPGRADE: b.upgraded.append(tsi) return b def _post_transaction_output(base, transaction, action_callback): """Returns a human-readable summary of the results of the transaction. :param action_callback: function generating output for specific action. It takes two parameters - action as a string and list of affected packages for this action :return: a list of lines containing a human-readable summary of the results of the transaction """ def _tsi_or_pkg_nevra_cmp(item1, item2): """Compares two transaction items or packages by nevra. Used as a fallback when tsi does not contain package object. """ ret = (item1.name > item2.name) - (item1.name < item2.name) if ret != 0: return ret nevra1 = hawkey.NEVRA(name=item1.name, epoch=item1.epoch, version=item1.version, release=item1.release, arch=item1.arch) nevra2 = hawkey.NEVRA(name=item2.name, epoch=item2.epoch, version=item2.version, release=item2.release, arch=item2.arch) ret = nevra1.evr_cmp(nevra2, base.sack) if ret != 0: return ret return (item1.arch > item2.arch) - (item1.arch < item2.arch) list_bunch = dnf.util._make_lists(transaction) skipped_conflicts, skipped_broken = base._skipped_packages( report_problems=False, transaction=transaction) skipped = skipped_conflicts.union(skipped_broken) out = [] for (action, tsis) in [(_('Upgraded'), list_bunch.upgraded), (_('Downgraded'), list_bunch.downgraded), (_('Installed'), list_bunch.installed + list_bunch.installed_group + list_bunch.installed_weak + list_bunch.installed_dep), (_('Reinstalled'), list_bunch.reinstalled), (_('Skipped'), skipped), (_('Removed'), list_bunch.erased + list_bunch.erased_dep + list_bunch.erased_clean), (_('Failed'), list_bunch.failed)]: out.extend(action_callback( action, sorted(tsis, key=functools.cmp_to_key(_tsi_or_pkg_nevra_cmp)))) return out def _name_unset_wrapper(input_name): # returns <name-unset> for everything that evaluates to False (None, empty..) return input_name if input_name else _("<name-unset>")