/proc/thread-self/root/proc/thread-self/root/usr/lib/python3.9/site-packages/dnf/cli
This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.
# Copyright 2005 Duke University# Copyright (C) 2012-2016 Red Hat, Inc.## This program is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 2 of the License, or# (at your option) any later version.## This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Library General Public License for more details.## You should have received a copy of the GNU General Public License# along with this program; if not, write to the Free Software# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.## Written by Seth Vidal """Command line interface yum class and related.""" from __future__ import print_functionfrom __future__ import absolute_importfrom __future__ import unicode_literals try: from collections.abc import Sequenceexcept ImportError: from collections import Sequenceimport datetimeimport loggingimport operatorimport osimport randomimport rpmimport sysimport time import hawkeyimport libdnf.transaction from . import outputfrom dnf.cli import CliErrorfrom dnf.i18n import ucd, _import dnfimport dnf.cli.aliasesimport dnf.cli.commandsimport dnf.cli.commands.aliasimport dnf.cli.commands.autoremoveimport dnf.cli.commands.checkimport dnf.cli.commands.cleanimport dnf.cli.commands.deplistimport dnf.cli.commands.distrosyncimport dnf.cli.commands.downgradeimport dnf.cli.commands.groupimport dnf.cli.commands.historyimport dnf.cli.commands.installimport dnf.cli.commands.makecacheimport dnf.cli.commands.markimport dnf.cli.commands.moduleimport dnf.cli.commands.reinstallimport dnf.cli.commands.removeimport dnf.cli.commands.repolistimport dnf.cli.commands.repoqueryimport dnf.cli.commands.searchimport dnf.cli.commands.shellimport dnf.cli.commands.swapimport dnf.cli.commands.updateinfoimport dnf.cli.commands.upgradeimport dnf.cli.commands.upgrademinimalimport dnf.cli.demandimport dnf.cli.formatimport dnf.cli.option_parserimport dnf.confimport dnf.conf.substitutionsimport dnf.constimport dnf.db.historyimport dnf.exceptionsimport dnf.loggingimport dnf.persistorimport dnf.pluginimport dnf.rpmimport dnf.sackimport dnf.transactionimport dnf.utilimport dnf.yum.misc logger = logging.getLogger('dnf') def _add_pkg_simple_list_lens(data, pkg, indent=''): """ Get the length of each pkg's column. Add that to data. This "knows" about simpleList and printVer. """ na = len(pkg.name) + 1 + len(pkg.arch) + len(indent) ver = len(pkg.evr) rid = len(pkg._from_repo) for (d, v) in (('na', na), ('ver', ver), ('rid', rid)): data[d].setdefault(v, 0) data[d][v] += 1 def _list_cmd_calc_columns(output, ypl): """ Work out the dynamic size of the columns to pass to fmtColumns. """ data = {'na' : {}, 'ver' : {}, 'rid' : {}} for lst in (ypl.installed, ypl.available, ypl.extras, ypl.autoremove, ypl.updates, ypl.recent): for pkg in lst: _add_pkg_simple_list_lens(data, pkg) if len(ypl.obsoletes) > 0: for (npkg, opkg) in ypl.obsoletesTuples: _add_pkg_simple_list_lens(data, npkg) _add_pkg_simple_list_lens(data, opkg, indent=" " * 4) data = [data['na'], data['ver'], data['rid']] columns = output.calcColumns(data, remainder_column=1) return (-columns[0], -columns[1], -columns[2]) def print_versions(pkgs, base, output): def sm_ui_time(x): return time.strftime("%c", time.gmtime(x)) rpmdb_sack = dnf.sack.rpmdb_sack(base) done = False for pkg in rpmdb_sack.query().installed().filterm(name=pkgs): if done: print("") done = True if pkg.epoch == '0': ver = '%s-%s.%s' % (pkg.version, pkg.release, pkg.arch) else: ver = '%s:%s-%s.%s' % (pkg.epoch, pkg.version, pkg.release, pkg.arch) name = output.term.bold(pkg.name) print(_(" Installed: %s-%s at %s") %(name, ver, sm_ui_time(pkg.installtime))) print(_(" Built : %s at %s") % (pkg.packager if pkg.packager else "", sm_ui_time(pkg.buildtime))) # :hawkey, no changelist information yet # print(_(" Committed: %s at %s") % (pkg.committer, # sm_ui_date(pkg.committime))) def report_module_switch(switchedModules): msg1 = _("The operation would result in switching of module '{0}' stream '{1}' to " "stream '{2}'") for moduleName, streams in switchedModules.items(): logger.warning(msg1.format(moduleName, streams[0], streams[1])) class BaseCli(dnf.Base): """This is the base class for yum cli.""" def __init__(self, conf=None): conf = conf or dnf.conf.Conf() super(BaseCli, self).__init__(conf=conf) self.output = output.Output(self, self.conf) def do_transaction(self, display=()): """Take care of package downloading, checking, user confirmation and actually running the transaction. :param display: `rpm.callback.TransactionProgress` object(s) :return: history database transaction ID or None """ if dnf.base.WITH_MODULES: if not self.conf.module_stream_switch: switchedModules = dict(self._moduleContainer.getSwitchedStreams()) if switchedModules: report_module_switch(switchedModules) msg = _("It is not possible to switch enabled streams of a module unless explicitly " "enabled via configuration option module_stream_switch.\n" "It is recommended to rather remove all installed content from the module, and " "reset the module using '{prog} module reset <module_name>' command. After " "you reset the module, you can install the other stream.").format( prog=dnf.util.MAIN_PROG) raise dnf.exceptions.Error(msg) trans = self.transaction pkg_str = self.output.list_transaction(trans) if pkg_str: logger.info(pkg_str) if trans: # Check which packages have to be downloaded install_pkgs = [] rmpkgs = [] install_only = True for tsi in trans: if tsi.action in dnf.transaction.FORWARD_ACTIONS: install_pkgs.append(tsi.pkg) elif tsi.action in dnf.transaction.BACKWARD_ACTIONS: install_only = False rmpkgs.append(tsi.pkg) # Close the connection to the rpmdb so that rpm doesn't hold the # SIGINT handler during the downloads. del self._ts # report the total download size to the user if not install_pkgs: self.output.reportRemoveSize(rmpkgs) else: self.output.reportDownloadSize(install_pkgs, install_only) if trans or self._moduleContainer.isChanged() or \ (self._history and (self._history.group or self._history.env)): # confirm with user if self.conf.downloadonly: logger.info(_("{prog} will only download packages for the transaction.").format( prog=dnf.util.MAIN_PROG_UPPER)) elif 'test' in self.conf.tsflags: logger.info(_("{prog} will only download packages, install gpg keys, and check the " "transaction.").format(prog=dnf.util.MAIN_PROG_UPPER)) if self._promptWanted(): if self.conf.assumeno or not self.output.userconfirm(): raise CliError(_("Operation aborted.")) else: logger.info(_('Nothing to do.')) return if trans: if install_pkgs: logger.info(_('Downloading Packages:')) try: total_cb = self.output.download_callback_total_cb self.download_packages(install_pkgs, self.output.progress, total_cb) except dnf.exceptions.DownloadError as e: specific = dnf.cli.format.indent_block(ucd(e)) errstr = _('Error downloading packages:') + '\n%s' % specific # setting the new line to prevent next chars being eaten up # by carriage returns print() raise dnf.exceptions.Error(errstr) # Check GPG signatures self.gpgsigcheck(install_pkgs) if self.conf.downloadonly: return if not isinstance(display, Sequence): display = [display] display = [output.CliTransactionDisplay()] + list(display) tid = super(BaseCli, self).do_transaction(display) # display last transaction (which was closed during do_transaction()) if tid is not None: trans = self.history.old([tid])[0] trans = dnf.db.group.RPMTransaction(self.history, trans._trans) else: trans = None if trans: # the post transaction summary is already written to log during # Base.do_transaction() so here only print the messages to the # user arranged in columns print() print('\n'.join(self.output.post_transaction_output(trans))) print() for tsi in trans: if tsi.state == libdnf.transaction.TransactionItemState_ERROR: raise dnf.exceptions.Error(_('Transaction failed')) return tid def gpgsigcheck(self, pkgs): """Perform GPG signature verification on the given packages, installing keys if possible. :param pkgs: a list of package objects to verify the GPG signatures of :raises: Will raise :class:`Error` if there's a problem """ error_messages = [] for po in pkgs: result, errmsg = self._sig_check_pkg(po) if result == 0: # Verified ok, or verify not req'd continue elif result == 1: ay = self.conf.assumeyes and not self.conf.assumeno if (not sys.stdin or not sys.stdin.isatty()) and not ay: raise dnf.exceptions.Error(_('Refusing to automatically import keys when running ' \ 'unattended.\nUse "-y" to override.')) # the callback here expects to be able to take options which # userconfirm really doesn't... so fake it fn = lambda x, y, z: self.output.userconfirm() try: self._get_key_for_package(po, fn) except (dnf.exceptions.Error, ValueError) as e: error_messages.append(str(e)) else: # Fatal error error_messages.append(errmsg) if error_messages: for msg in error_messages: logger.critical(msg) raise dnf.exceptions.Error(_("GPG check FAILED")) def latest_changelogs(self, package): """Return list of changelogs for package newer then installed version""" newest = None # find the date of the newest changelog for installed version of package # stored in rpmdb for mi in self._rpmconn.readonly_ts.dbMatch('name', package.name): changelogtimes = mi[rpm.RPMTAG_CHANGELOGTIME] if changelogtimes: newest = datetime.date.fromtimestamp(changelogtimes[0]) break chlogs = [chlog for chlog in package.changelogs if newest is None or chlog['timestamp'] > newest] return chlogs def format_changelog(self, changelog): """Return changelog formatted as in spec file""" chlog_str = '* %s %s\n%s\n' % ( changelog['timestamp'].strftime("%a %b %d %X %Y"), dnf.i18n.ucd(changelog['author']), dnf.i18n.ucd(changelog['text'])) return chlog_str def print_changelogs(self, packages): # group packages by src.rpm to avoid showing duplicate changelogs bysrpm = dict() for p in packages: # there are packages without source_name, use name then. bysrpm.setdefault(p.source_name or p.name, []).append(p) for source_name in sorted(bysrpm.keys()): bin_packages = bysrpm[source_name] print(_("Changelogs for {}").format(', '.join([str(pkg) for pkg in bin_packages]))) for chl in self.latest_changelogs(bin_packages[0]): print(self.format_changelog(chl)) def check_updates(self, patterns=(), reponame=None, print_=True, changelogs=False): """Check updates matching given *patterns* in selected repository.""" ypl = self.returnPkgLists('upgrades', patterns, reponame=reponame) if self.conf.obsoletes or self.conf.verbose: typl = self.returnPkgLists('obsoletes', patterns, reponame=reponame) ypl.obsoletes = typl.obsoletes ypl.obsoletesTuples = typl.obsoletesTuples if print_: columns = _list_cmd_calc_columns(self.output, ypl) if len(ypl.updates) > 0: local_pkgs = {} highlight = self.output.term.MODE['bold'] if highlight: # Do the local/remote split we get in "yum updates" for po in sorted(ypl.updates): local = po.localPkg() if os.path.exists(local) and po.verifyLocalPkg(): local_pkgs[(po.name, po.arch)] = po cul = self.conf.color_update_local cur = self.conf.color_update_remote self.output.listPkgs(ypl.updates, '', outputType='list', highlight_na=local_pkgs, columns=columns, highlight_modes={'=' : cul, 'not in' : cur}) if changelogs: self.print_changelogs(ypl.updates) if len(ypl.obsoletes) > 0: print(_('Obsoleting Packages')) # The tuple is (newPkg, oldPkg) ... so sort by new for obtup in sorted(ypl.obsoletesTuples, key=operator.itemgetter(0)): self.output.updatesObsoletesList(obtup, 'obsoletes', columns=columns) return ypl.updates or ypl.obsoletes def distro_sync_userlist(self, userlist): """ Upgrade or downgrade packages to match the latest versions available in the enabled repositories. :return: (exit_code, [ errors ]) exit_code is:: 0 = we're done, exit 1 = we've errored, exit with error string 2 = we've got work yet to do, onto the next stage """ oldcount = self._goal.req_length() if len(userlist) == 0: self.distro_sync() else: for pkg_spec in userlist: self.distro_sync(pkg_spec) cnt = self._goal.req_length() - oldcount if cnt <= 0 and not self._goal.req_has_distupgrade_all(): msg = _('No packages marked for distribution synchronization.') raise dnf.exceptions.Error(msg) def downgradePkgs(self, specs=[], file_pkgs=[], strict=False): """Attempt to take the user specified list of packages or wildcards and downgrade them. If a complete version number is specified, attempt to downgrade them to the specified version :param specs: a list of names or wildcards specifying packages to downgrade :param file_pkgs: a list of pkg objects from local files """ result = False for pkg in file_pkgs: try: self.package_downgrade(pkg, strict=strict) result = True except dnf.exceptions.MarkingError as e: logger.info(_('No match for argument: %s'), self.output.term.bold(pkg.location)) for arg in specs: try: self.downgrade_to(arg, strict=strict) result = True except dnf.exceptions.PackageNotFoundError as err: msg = _('No package %s available.') logger.info(msg, self.output.term.bold(arg)) except dnf.exceptions.PackagesNotInstalledError as err: logger.info(_('Packages for argument %s available, but not installed.'), self.output.term.bold(err.pkg_spec)) except dnf.exceptions.MarkingError: assert False if not result: raise dnf.exceptions.Error(_('No packages marked for downgrade.')) def output_packages(self, basecmd, pkgnarrow='all', patterns=(), reponame=None): """Output selection *pkgnarrow* of packages matching *patterns* and *repoid*.""" try: highlight = self.output.term.MODE['bold'] ypl = self.returnPkgLists( pkgnarrow, patterns, installed_available=highlight, reponame=reponame) except dnf.exceptions.Error as e: return 1, [str(e)] else: update_pkgs = {} inst_pkgs = {} local_pkgs = {} columns = None if basecmd == 'list': # Dynamically size the columns columns = _list_cmd_calc_columns(self.output, ypl) if highlight and ypl.installed: # If we have installed and available lists, then do the # highlighting for the installed packages so you can see what's # available to update, an extra, or newer than what we have. for pkg in (ypl.hidden_available + ypl.reinstall_available + ypl.old_available): key = (pkg.name, pkg.arch) if key not in update_pkgs or pkg > update_pkgs[key]: update_pkgs[key] = pkg if highlight and ypl.available: # If we have installed and available lists, then do the # highlighting for the available packages so you can see what's # available to install vs. update vs. old. for pkg in ypl.hidden_installed: key = (pkg.name, pkg.arch) if key not in inst_pkgs or pkg > inst_pkgs[key]: inst_pkgs[key] = pkg if highlight and ypl.updates: # Do the local/remote split we get in "yum updates" for po in sorted(ypl.updates): if po.reponame != hawkey.SYSTEM_REPO_NAME: local_pkgs[(po.name, po.arch)] = po # Output the packages: clio = self.conf.color_list_installed_older clin = self.conf.color_list_installed_newer clir = self.conf.color_list_installed_reinstall clie = self.conf.color_list_installed_extra rip = self.output.listPkgs(ypl.installed, _('Installed Packages'), basecmd, highlight_na=update_pkgs, columns=columns, highlight_modes={'>' : clio, '<' : clin, '=' : clir, 'not in' : clie}) clau = self.conf.color_list_available_upgrade clad = self.conf.color_list_available_downgrade clar = self.conf.color_list_available_reinstall clai = self.conf.color_list_available_install rap = self.output.listPkgs(ypl.available, _('Available Packages'), basecmd, highlight_na=inst_pkgs, columns=columns, highlight_modes={'<' : clau, '>' : clad, '=' : clar, 'not in' : clai}) raep = self.output.listPkgs(ypl.autoremove, _('Autoremove Packages'), basecmd, columns=columns) rep = self.output.listPkgs(ypl.extras, _('Extra Packages'), basecmd, columns=columns) cul = self.conf.color_update_local cur = self.conf.color_update_remote rup = self.output.listPkgs(ypl.updates, _('Available Upgrades'), basecmd, highlight_na=local_pkgs, columns=columns, highlight_modes={'=' : cul, 'not in' : cur}) # XXX put this into the ListCommand at some point if len(ypl.obsoletes) > 0 and basecmd == 'list': # if we've looked up obsolete lists and it's a list request rop = len(ypl.obsoletes) print(_('Obsoleting Packages')) for obtup in sorted(ypl.obsoletesTuples, key=operator.itemgetter(0)): self.output.updatesObsoletesList(obtup, 'obsoletes', columns=columns) else: rop = self.output.listPkgs(ypl.obsoletes, _('Obsoleting Packages'), basecmd, columns=columns) rrap = self.output.listPkgs(ypl.recent, _('Recently Added Packages'), basecmd, columns=columns) if len(patterns) and \ rrap == 0 and rop == 0 and rup == 0 and rep == 0 and rap == 0 and raep == 0 and rip == 0: raise dnf.exceptions.Error(_('No matching Packages to list')) def returnPkgLists(self, pkgnarrow='all', patterns=None, installed_available=False, reponame=None): """Return a :class:`dnf.yum.misc.GenericHolder` object containing lists of package objects that match the given names or wildcards. :param pkgnarrow: a string specifying which types of packages lists to produce, such as updates, installed, available, etc. :param patterns: a list of names or wildcards specifying packages to list :param installed_available: whether the available package list is present as .hidden_available when doing all, available, or installed :param reponame: limit packages list to the given repository :return: a :class:`dnf.yum.misc.GenericHolder` instance with the following lists defined:: available = list of packageObjects installed = list of packageObjects upgrades = tuples of packageObjects (updating, installed) extras = list of packageObjects obsoletes = tuples of packageObjects (obsoleting, installed) recent = list of packageObjects """ done_hidden_available = False done_hidden_installed = False if installed_available and pkgnarrow == 'installed': done_hidden_available = True pkgnarrow = 'all' elif installed_available and pkgnarrow == 'available': done_hidden_installed = True pkgnarrow = 'all' ypl = self._do_package_lists( pkgnarrow, patterns, ignore_case=True, reponame=reponame) if self.conf.showdupesfromrepos: for pkg in ypl.reinstall_available: if not pkg.installed and not done_hidden_available: ypl.available.append(pkg) if installed_available: ypl.hidden_available = ypl.available ypl.hidden_installed = ypl.installed if done_hidden_available: ypl.available = [] if done_hidden_installed: ypl.installed = [] return ypl def provides(self, args): """Print out a list of packages that provide the given file or feature. This a cli wrapper to the provides methods in the rpmdb and pkgsack. :param args: the name of a file or feature to search for :return: (exit_code, [ errors ]) exit_code is:: 0 = we're done, exit 1 = we've errored, exit with error string 2 = we've got work yet to do, onto the next stage """ # always in showdups mode old_sdup = self.conf.showdupesfromrepos self.conf.showdupesfromrepos = True matches = [] used_search_strings = [] for spec in args: query, used_search_string = super(BaseCli, self).provides(spec) matches.extend(query) used_search_strings.extend(used_search_string) for pkg in sorted(matches): self.output.matchcallback_verbose(pkg, used_search_strings, args) self.conf.showdupesfromrepos = old_sdup if not matches: raise dnf.exceptions.Error(_('No matches found. If searching for a file, ' 'try specifying the full path or using a ' 'wildcard prefix ("*/") at the beginning.')) def _promptWanted(self): # shortcut for the always-off/always-on options if self.conf.assumeyes and not self.conf.assumeno: return False return True class Cli(object): def __init__(self, base): self.base = base self.cli_commands = {} self.command = None self.demands = dnf.cli.demand.DemandSheet() # :api self.register_command(dnf.cli.commands.alias.AliasCommand) self.register_command(dnf.cli.commands.autoremove.AutoremoveCommand) self.register_command(dnf.cli.commands.check.CheckCommand) self.register_command(dnf.cli.commands.clean.CleanCommand) self.register_command(dnf.cli.commands.distrosync.DistroSyncCommand) self.register_command(dnf.cli.commands.deplist.DeplistCommand) self.register_command(dnf.cli.commands.downgrade.DowngradeCommand) self.register_command(dnf.cli.commands.group.GroupCommand) self.register_command(dnf.cli.commands.history.HistoryCommand) self.register_command(dnf.cli.commands.install.InstallCommand) self.register_command(dnf.cli.commands.makecache.MakeCacheCommand) self.register_command(dnf.cli.commands.mark.MarkCommand) self.register_command(dnf.cli.commands.module.ModuleCommand) self.register_command(dnf.cli.commands.reinstall.ReinstallCommand) self.register_command(dnf.cli.commands.remove.RemoveCommand) self.register_command(dnf.cli.commands.repolist.RepoListCommand) self.register_command(dnf.cli.commands.repoquery.RepoQueryCommand) self.register_command(dnf.cli.commands.search.SearchCommand) self.register_command(dnf.cli.commands.shell.ShellCommand) self.register_command(dnf.cli.commands.swap.SwapCommand) self.register_command(dnf.cli.commands.updateinfo.UpdateInfoCommand) self.register_command(dnf.cli.commands.upgrade.UpgradeCommand) self.register_command(dnf.cli.commands.upgrademinimal.UpgradeMinimalCommand) self.register_command(dnf.cli.commands.InfoCommand) self.register_command(dnf.cli.commands.ListCommand) self.register_command(dnf.cli.commands.ProvidesCommand) self.register_command(dnf.cli.commands.CheckUpdateCommand) self.register_command(dnf.cli.commands.RepoPkgsCommand) self.register_command(dnf.cli.commands.HelpCommand) def _configure_repos(self, opts): self.base.read_all_repos(opts) if opts.repofrompath: for label, path in opts.repofrompath.items(): this_repo = self.base.repos.add_new_repo(label, self.base.conf, baseurl=[path]) this_repo._configure_from_options(opts) # do not let this repo to be disabled opts.repos_ed.append((label, "enable")) if opts.repo: opts.repos_ed.insert(0, ("*", "disable")) opts.repos_ed.extend([(r, "enable") for r in opts.repo]) notmatch = set() # Process repo enables and disables in order try: for (repo, operation) in opts.repos_ed: repolist = self.base.repos.get_matching(repo) if not repolist: if self.base.conf.strict and operation == "enable": msg = _("Unknown repo: '%s'") raise dnf.exceptions.RepoError(msg % repo) notmatch.add(repo) if operation == "enable": repolist.enable() else: repolist.disable() except dnf.exceptions.ConfigError as e: logger.critical(e) self.optparser.print_help() sys.exit(1) for repo in notmatch: logger.warning(_("No repository match: %s"), repo) expired_repos = self.base._repo_persistor.get_expired_repos() if expired_repos is None: expired_repos = self.base.repos.keys() for rid in expired_repos: repo = self.base.repos.get(rid) if repo: repo._repo.expire() # setup the progress bars/callbacks (bar, self.base._ds_callback) = self.base.output.setup_progress_callbacks() self.base.repos.all().set_progress_bar(bar) key_import = output.CliKeyImport(self.base, self.base.output) self.base.repos.all()._set_key_import(key_import) def _log_essentials(self): logger.debug('{prog} version: %s'.format(prog=dnf.util.MAIN_PROG_UPPER), dnf.const.VERSION) logger.log(dnf.logging.DDEBUG, 'Command: %s', self.cmdstring) logger.log(dnf.logging.DDEBUG, 'Installroot: %s', self.base.conf.installroot) logger.log(dnf.logging.DDEBUG, 'Releasever: %s', self.base.conf.releasever) logger.debug("cachedir: %s", self.base.conf.cachedir) def _process_demands(self): demands = self.demands repos = self.base.repos if demands.root_user: if not dnf.util.am_i_root(): raise dnf.exceptions.Error( _('This command has to be run with superuser privileges ' '(under the root user on most systems).')) if demands.changelogs: for repo in repos.iter_enabled(): repo.load_metadata_other = True if demands.cacheonly or self.base.conf.cacheonly: self.base.conf.cacheonly = True for repo in repos.values(): repo._repo.setSyncStrategy(dnf.repo.SYNC_ONLY_CACHE) else: if demands.freshest_metadata: for repo in repos.iter_enabled(): repo._repo.expire() elif not demands.fresh_metadata: for repo in repos.values(): repo._repo.setSyncStrategy(dnf.repo.SYNC_LAZY) if demands.sack_activation: self.base.fill_sack( load_system_repo='auto' if self.demands.load_system_repo else False, load_available_repos=self.demands.available_repos) def _parse_commands(self, opts, args): """Check that the requested CLI command exists.""" basecmd = opts.command command_cls = self.cli_commands.get(basecmd) if command_cls is None: logger.critical(_('No such command: %s. Please use %s --help'), basecmd, sys.argv[0]) if self.base.conf.plugins: logger.critical(_("It could be a {PROG} plugin command, " "try: \"{prog} install 'dnf-command(%s)'\"").format( prog=dnf.util.MAIN_PROG, PROG=dnf.util.MAIN_PROG_UPPER), basecmd) else: logger.critical(_("It could be a {prog} plugin command, " "but loading of plugins is currently disabled.").format( prog=dnf.util.MAIN_PROG_UPPER)) raise CliError self.command = command_cls(self) logger.log(dnf.logging.DDEBUG, 'Base command: %s', basecmd) logger.log(dnf.logging.DDEBUG, 'Extra commands: %s', args) def configure(self, args, option_parser=None): """Parse command line arguments, and set up :attr:`self.base.conf` and :attr:`self.cmds`, as well as logger objects in base instance. :param args: a list of command line arguments :param option_parser: a class for parsing cli options """ aliases = dnf.cli.aliases.Aliases() args = aliases.resolve(args) self.optparser = dnf.cli.option_parser.OptionParser() \ if option_parser is None else option_parser opts = self.optparser.parse_main_args(args) # Just print out the version if that's what the user wanted if opts.version: print(dnf.const.VERSION) print_versions(self.base.conf.history_record_packages, self.base, self.base.output) sys.exit(0) if opts.quiet: opts.debuglevel = 0 opts.errorlevel = 2 if opts.verbose: opts.debuglevel = opts.errorlevel = dnf.const.VERBOSE_LEVEL # Read up configuration options and initialize plugins try: if opts.cacheonly: self.base.conf._set_value("cachedir", self.base.conf.system_cachedir, dnf.conf.PRIO_DEFAULT) self.demands.cacheonly = True self.base.conf._configure_from_options(opts) self._read_conf_file(opts.releasever) if 'arch' in opts: self.base.conf.arch = opts.arch self.base.conf._adjust_conf_options() except (dnf.exceptions.ConfigError, ValueError) as e: logger.critical(_('Config error: %s'), e) sys.exit(1) except IOError as e: e = '%s: %s' % (ucd(str(e)), repr(e.filename)) logger.critical(_('Config error: %s'), e) sys.exit(1) if opts.destdir is not None: self.base.conf.destdir = opts.destdir if not self.base.conf.downloadonly and opts.command not in ( 'download', 'system-upgrade', 'reposync', 'modulesync'): logger.critical(_('--destdir or --downloaddir must be used with --downloadonly ' 'or download or system-upgrade command.') ) sys.exit(1) if (opts.set_enabled or opts.set_disabled) and opts.command != 'config-manager': logger.critical( _('--enable, --set-enabled and --disable, --set-disabled ' 'must be used with config-manager command.')) sys.exit(1) if opts.sleeptime is not None: time.sleep(random.randrange(opts.sleeptime * 60)) # store the main commands & summaries, before plugins are loaded self.optparser.add_commands(self.cli_commands, 'main') # store the plugin commands & summaries self.base.init_plugins(opts.disableplugin, opts.enableplugin, self) self.optparser.add_commands(self.cli_commands,'plugin') # show help if no command specified # this is done here, because we first have the full # usage info after the plugins are loaded. if not opts.command: self.optparser.print_help() sys.exit(0) # save our original args out self.base.args = args # save out as a nice command string self.cmdstring = self.optparser.prog + ' ' for arg in self.base.args: self.cmdstring += '%s ' % arg self._log_essentials() try: self._parse_commands(opts, args) except CliError: sys.exit(1) # show help for dnf <command> --help / --help-cmd if opts.help: self.optparser.print_help(self.command) sys.exit(0) opts = self.optparser.parse_command_args(self.command, args) if opts.allowerasing: self.demands.allow_erasing = opts.allowerasing self.base._allow_erasing = True if opts.freshest_metadata: self.demands.freshest_metadata = opts.freshest_metadata if opts.debugsolver: self.base.conf.debug_solver = True if opts.obsoletes: self.base.conf.obsoletes = True self.command.pre_configure() self.base.pre_configure_plugins() # with cachedir in place we can configure stuff depending on it: self.base._activate_persistor() self._configure_repos(opts) self.base.configure_plugins() self.base.conf._configure_from_options(opts) self.command.configure() if self.base.conf.destdir: dnf.util.ensure_dir(self.base.conf.destdir) self.base.repos.all().pkgdir = self.base.conf.destdir if self.base.conf.color != 'auto': self.base.output.term.reinit(color=self.base.conf.color) if rpm.expandMacro('%_pkgverify_level') in ('signature', 'all'): forcing = False for repo in self.base.repos.iter_enabled(): if repo.gpgcheck: continue repo.gpgcheck = True forcing = True if not self.base.conf.localpkg_gpgcheck: self.base.conf.localpkg_gpgcheck = True forcing = True if forcing: logger.warning( _("Warning: Enforcing GPG signature check globally " "as per active RPM security policy (see 'gpgcheck' in " "dnf.conf(5) for how to squelch this message)" ) ) def _read_conf_file(self, releasever=None): timer = dnf.logging.Timer('config') conf = self.base.conf # replace remote config path with downloaded file conf._check_remote_file('config_file_path') # search config file inside the installroot first conf._search_inside_installroot('config_file_path') # check whether a config file is requested from command line and the file exists filename = conf._get_value('config_file_path') if (conf._get_priority('config_file_path') == dnf.conf.PRIO_COMMANDLINE) and \ not os.path.isfile(filename): raise dnf.exceptions.ConfigError(_('Config file "{}" does not exist').format(filename)) # read config conf.read(priority=dnf.conf.PRIO_MAINCONFIG) # search reposdir file inside the installroot first from_root = conf._search_inside_installroot('reposdir') # Update vars from same root like repos were taken if conf._get_priority('varsdir') == dnf.conf.PRIO_COMMANDLINE: from_root = "/" subst = conf.substitutions subst.update_from_etc(from_root, varsdir=conf._get_value('varsdir')) # cachedir, logs, releasever, and gpgkey are taken from or stored in installroot if releasever is None and conf.releasever is None: releasever = dnf.rpm.detect_releasever(conf.installroot) elif releasever == '/': releasever = dnf.rpm.detect_releasever(releasever) if releasever is not None: conf.releasever = releasever if conf.releasever is None: logger.warning(_("Unable to detect release version (use '--releasever' to specify " "release version)")) for opt in ('cachedir', 'logdir', 'persistdir'): conf.prepend_installroot(opt) self.base._logging._setup_from_dnf_conf(conf) timer() return conf def _populate_update_security_filter(self, opts, cmp_type='eq', all=None): """ :param opts: :param cmp_type: string supported "eq", "gte" :param all: :return: """ if (opts is None) and (all is None): return types = [] if opts.bugfix or all: types.append('bugfix') if opts.enhancement or all: types.append('enhancement') if opts.newpackage or all: types.append('newpackage') if opts.security or all: types.append('security') self.base.add_security_filters(cmp_type, types=types, advisory=opts.advisory, bugzilla=opts.bugzilla, cves=opts.cves, severity=opts.severity) def redirect_logger(self, stdout=None, stderr=None): # :api """ Change minimal logger level for terminal output to stdout and stderr according to specific command requirements @param stdout: logging.INFO, logging.WARNING, ... @param stderr:logging.INFO, logging.WARNING, ... """ if stdout is not None: self.base._logging.stdout_handler.setLevel(stdout) if stderr is not None: self.base._logging.stderr_handler.setLevel(stderr) def redirect_repo_progress(self, fo=sys.stderr): progress = dnf.cli.progress.MultiFileProgressMeter(fo) self.base.output.progress = progress self.base.repos.all().set_progress_bar(progress) def _check_running_kernel(self): kernel = self.base.sack.get_running_kernel() if kernel is None: return q = self.base.sack.query().filterm(provides=kernel.name) q = q.installed() q.filterm(advisory_type='security') ikpkg = kernel for pkg in q: if pkg > ikpkg: ikpkg = pkg if ikpkg > kernel: print('Security: %s is an installed security update' % ikpkg) print('Security: %s is the currently running version' % kernel) def _option_conflict(self, option_string_1, option_string_2): print(self.optparser.print_usage()) raise dnf.exceptions.Error(_("argument {}: not allowed with argument {}".format( option_string_1, option_string_2))) def register_command(self, command_cls): """Register a Command. :api""" for name in command_cls.aliases: if name in self.cli_commands: raise dnf.exceptions.ConfigError(_('Command "%s" already defined') % name) self.cli_commands[name] = command_cls def run(self): """Call the base command, and pass it the extended commands or arguments. :return: (exit_code, [ errors ]) exit_code is:: 0 = we're done, exit 1 = we've errored, exit with error string 2 = we've got work yet to do, onto the next stage """ self._process_demands() # Reports about excludes and includes (but not from plugins) if self.base.conf.excludepkgs: logger.debug( _('Excludes in dnf.conf: ') + ", ".join(sorted(set(self.base.conf.excludepkgs)))) if self.base.conf.includepkgs: logger.debug( _('Includes in dnf.conf: ') + ", ".join(sorted(set(self.base.conf.includepkgs)))) for repo in self.base.repos.iter_enabled(): if repo.excludepkgs: logger.debug(_('Excludes in repo ') + repo.id + ": " + ", ".join(sorted(set(repo.excludepkgs)))) if repo.includepkgs: logger.debug(_('Includes in repo ') + repo.id + ": " + ", ".join(sorted(set(repo.includepkgs)))) return self.command.run()