File Explorer

/proc/thread-self/root/proc/self/root/usr/lib/python3.9/site-packages/dnf/cli/commands

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

module.py16.5 KB · 414 lines
# supplies the 'module' command.## Copyright (C) 2014-2017  Red Hat, Inc.## This copyrighted material is made available to anyone wishing to use,# modify, copy, or redistribute it subject to the terms and conditions of# the GNU General Public License v.2, or (at your option) any later version.# This program is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY expressed or implied, including the implied warranties of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General# Public License for more details.  You should have received a copy of the# GNU General Public License along with this program; if not, write to the# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the# source code or documentation are not subject to the GNU General Public# License and may only be used or replicated with the express permission of# Red Hat, Inc.# from __future__ import print_function from dnf.cli import commands, CliErrorfrom dnf.i18n import _from dnf.module.exceptions import NoModuleExceptionfrom dnf.util import loggerimport dnf.util import sysimport os import hawkeyimport libdnfimport dnf.module.module_baseimport dnf.exceptions  class ModuleCommand(commands.Command):    class SubCommand(commands.Command):         def __init__(self, cli):            super(ModuleCommand.SubCommand, self).__init__(cli)            self.module_base = dnf.module.module_base.ModuleBase(self.base)         def _get_modules_from_name_stream_specs(self):            modules_from_specs = set()            for module_spec in self.opts.module_spec:                __, nsvcap = self.module_base._get_modules(module_spec)                # When there is no match, the problem was already reported by module_base.remove()                if nsvcap is None:                    continue                name = nsvcap.name if nsvcap.name else ""                stream = nsvcap.stream if nsvcap.stream else ""                if (nsvcap.version and nsvcap.version != -1) or nsvcap.context:                    logger.info(_("Only module name, stream, architecture or profile is used. "                                  "Ignoring unneeded information in argument: '{}'").format(                        module_spec))                arch = nsvcap.arch if nsvcap.arch else ""                modules = self.base._moduleContainer.query(name, stream, "", "", arch)                modules_from_specs.update(modules)            return modules_from_specs         def _get_module_artifact_names(self, use_modules, skip_modules):            artifacts = set()            pkg_names = set()            for module in use_modules:                if module not in skip_modules:                    if self.base._moduleContainer.isModuleActive(module):                        artifacts.update(module.getArtifacts())            for artifact in artifacts:                subj = hawkey.Subject(artifact)                for nevra_obj in subj.get_nevra_possibilities(                        forms=[hawkey.FORM_NEVRA]):                    if nevra_obj.name:                        pkg_names.add(nevra_obj.name)            return pkg_names, artifacts     class ListSubCommand(SubCommand):         aliases = ('list',)        summary = _('list all module streams, profiles and states')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True         def run_on_module(self):            mods = self.module_base             if self.opts.enabled:                output = mods._get_brief_description(                    self.opts.module_spec, libdnf.module.ModulePackageContainer.ModuleState_ENABLED)            elif self.opts.disabled:                output = mods._get_brief_description(                    self.opts.module_spec,                    libdnf.module.ModulePackageContainer.ModuleState_DISABLED)            elif self.opts.installed:                output = mods._get_brief_description(                    self.opts.module_spec,                    libdnf.module.ModulePackageContainer.ModuleState_INSTALLED)            else:                output = mods._get_brief_description(                    self.opts.module_spec, libdnf.module.ModulePackageContainer.ModuleState_UNKNOWN)            if output:                print(output)                return            if self.opts.module_spec:                msg = _('No matching Modules to list')                raise dnf.exceptions.Error(msg)     class InfoSubCommand(SubCommand):         aliases = ('info',)        summary = _('print detailed information about a module')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True         def run_on_module(self):            if self.opts.verbose:                output = self.module_base._get_full_info(self.opts.module_spec)            elif self.opts.profile:                output = self.module_base._get_info_profiles(self.opts.module_spec)            else:                output = self.module_base._get_info(self.opts.module_spec)            if output:                print(output)            else:                raise dnf.exceptions.Error(_('No matching Modules to list'))     class EnableSubCommand(SubCommand):         aliases = ('enable',)        summary = _('enable a module stream')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True            demands.resolving = True            demands.root_user = True         def run_on_module(self):            try:                self.module_base.enable(self.opts.module_spec)            except dnf.exceptions.MarkingErrors as e:                if self.base.conf.strict:                    if e.no_match_group_specs or e.error_group_specs:                        raise e                    if e.module_depsolv_errors and e.module_depsolv_errors[1] != \                            libdnf.module.ModulePackageContainer.ModuleErrorType_ERROR_IN_DEFAULTS:                        raise e                logger.error(str(e))     class DisableSubCommand(SubCommand):         aliases = ('disable',)        summary = _('disable a module with all its streams')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True            demands.resolving = True            demands.root_user = True         def run_on_module(self):            try:                self.module_base.disable(self.opts.module_spec)            except dnf.exceptions.MarkingErrors as e:                if self.base.conf.strict:                    if e.no_match_group_specs or e.error_group_specs:                        raise e                    if e.module_depsolv_errors and e.module_depsolv_errors[1] != \                            libdnf.module.ModulePackageContainer.ModuleErrorType_ERROR_IN_DEFAULTS:                        raise e                logger.error(str(e))     class ResetSubCommand(SubCommand):         aliases = ('reset',)        summary = _('reset a module')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True            demands.resolving = True            demands.root_user = True         def run_on_module(self):            try:                self.module_base.reset(self.opts.module_spec)            except dnf.exceptions.MarkingErrors as e:                if self.base.conf.strict:                    if e.no_match_group_specs:                        raise e                logger.error(str(e))     class InstallSubCommand(SubCommand):         aliases = ('install',)        summary = _('install a module profile including its packages')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True            demands.resolving = True            demands.root_user = True         def run_on_module(self):            try:                self.module_base.install(self.opts.module_spec, self.base.conf.strict)            except dnf.exceptions.MarkingErrors as e:                if self.base.conf.strict:                    if e.no_match_group_specs or e.error_group_specs:                        raise e                logger.error(str(e))     class UpdateSubCommand(SubCommand):         aliases = ('update',)        summary = _('update packages associated with an active stream')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True            demands.resolving = True            demands.root_user = True         def run_on_module(self):            module_specs = self.module_base.upgrade(self.opts.module_spec)            if module_specs:                raise NoModuleException(", ".join(module_specs))     class RemoveSubCommand(SubCommand):         aliases = ('remove', 'erase',)        summary = _('remove installed module profiles and their packages')         def configure(self):            demands = self.cli.demands            demands.allow_erasing = True            demands.available_repos = True            demands.fresh_metadata = False            demands.resolving = True            demands.root_user = True            demands.sack_activation = True         def run_on_module(self):            skipped_groups = self.module_base.remove(self.opts.module_spec)            if self.opts.all:                modules_from_specs = self._get_modules_from_name_stream_specs()                remove_names_from_spec, __ = self._get_module_artifact_names(                    modules_from_specs, set())                keep_names, __ = self._get_module_artifact_names(                    self.base._moduleContainer.getModulePackages(), modules_from_specs)                remove_query = self.base.sack.query().installed().filterm(                    name=remove_names_from_spec)                keep_query = self.base.sack.query().installed().filterm(name=keep_names)                for pkg in remove_query:                    if pkg in keep_query:                        msg = _("Package {} belongs to multiple modules, skipping").format(pkg)                        logger.info(msg)                    else:                        self.base.goal.erase(                            pkg, clean_deps=self.base.conf.clean_requirements_on_remove)            if not skipped_groups:                return             logger.error(dnf.exceptions.MarkingErrors(no_match_group_specs=skipped_groups))     class SwitchToSubCommand(SubCommand):         aliases = ('switch-to',)        summary = _('switch a module to a stream and distrosync rpm packages')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True            demands.resolving = True            demands.root_user = True            self.base.conf.module_stream_switch = True         def run_on_module(self):            try:                self.module_base.switch_to(self.opts.module_spec, strict=self.base.conf.strict)            except dnf.exceptions.MarkingErrors as e:                if self.base.conf.strict:                    if e.no_match_group_specs or e.error_group_specs:                        raise e                logger.error(str(e))     class ProvidesSubCommand(SubCommand):         aliases = ("provides", )        summary = _('list modular packages')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True         def run_on_module(self):            output = self.module_base._what_provides(self.opts.module_spec)            if output:                print(output)     class RepoquerySubCommand(SubCommand):         aliases = ("repoquery", )        summary = _('list packages belonging to a module')         def configure(self):            demands = self.cli.demands            demands.available_repos = True            demands.sack_activation = True         def run_on_module(self):            modules_from_specs = set()            for module_spec in self.opts.module_spec:                modules, __ = self.module_base._get_modules(module_spec)                modules_from_specs.update(modules)            names_from_spec, spec_artifacts = self._get_module_artifact_names(                modules_from_specs, set())            package_strings = set()            if self.opts.available or not self.opts.installed:                query = self.base.sack.query().available().filterm(nevra_strict=spec_artifacts)                for pkg in query:                    package_strings.add(str(pkg))            if self.opts.installed:                query = self.base.sack.query().installed().filterm(name=names_from_spec)                for pkg in query:                    package_strings.add(str(pkg))             output = "\n".join(sorted(package_strings))            print(output)      SUBCMDS = {ListSubCommand, InfoSubCommand, EnableSubCommand,               DisableSubCommand, ResetSubCommand, InstallSubCommand, UpdateSubCommand,               RemoveSubCommand, SwitchToSubCommand, ProvidesSubCommand, RepoquerySubCommand}     SUBCMDS_NOT_REQUIRED_ARG = {ListSubCommand}     aliases = ("module",)    summary = _("Interact with Modules.")     def __init__(self, cli):        super(ModuleCommand, self).__init__(cli)        subcmd_objs = (subcmd(cli) for subcmd in self.SUBCMDS)        self.subcmd = None        self._subcmd_name2obj = {            alias: subcmd for subcmd in subcmd_objs for alias in subcmd.aliases}     def set_argparser(self, parser):        narrows = parser.add_mutually_exclusive_group()        narrows.add_argument('--enabled', dest='enabled',                             action='store_true',                             help=_("show only enabled modules"))        narrows.add_argument('--disabled', dest='disabled',                             action='store_true',                             help=_("show only disabled modules"))        narrows.add_argument('--installed', dest='installed',                             action='store_true',                             help=_("show only installed modules or packages"))        narrows.add_argument('--profile', dest='profile',                             action='store_true',                             help=_("show profile content"))        parser.add_argument('--available', dest='available', action='store_true',                            help=_("show only available packages"))        narrows.add_argument('--all', dest='all',                             action='store_true',                             help=_("remove all modular packages"))        subcommand_choices = []        subcommand_help = []        for subcmd in sorted(self.SUBCMDS, key=lambda x: x.aliases[0]):            subcommand_choices.append(subcmd.aliases[0])            subcommand_help.append('{}: {}'.format(subcmd.aliases[0], subcmd.summary or ''))        parser.add_argument('subcmd', nargs=1, choices=subcommand_choices,                            metavar='<modular command>',                            help='\n'.join(subcommand_help))        parser.add_argument('module_spec', metavar='module-spec', nargs='*',                            help=_("Module specification"))     def configure(self):        try:            self.subcmd = self._subcmd_name2obj[self.opts.subcmd[0]]        except (CliError, KeyError):            self.cli.optparser.print_usage()            raise CliError        self.subcmd.opts = self.opts        self.subcmd.configure()     def run(self):        self.check_required_argument()        self.subcmd.run_on_module()     def check_required_argument(self):        not_required_argument = [alias                                 for subcmd in self.SUBCMDS_NOT_REQUIRED_ARG                                 for alias in subcmd.aliases]        if self.opts.subcmd[0] not in not_required_argument:            if not self.opts.module_spec:                raise CliError(                    _("{} {} {}: too few arguments").format(dnf.util.MAIN_PROG,                                                            self.opts.command,                                                            self.opts.subcmd[0]))