File Explorer

/proc/self/root/proc/thread-self/root/usr/lib/python3.9/site-packages/dnf/cli/commands

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

updateinfo.py22.8 KB · 505 lines
# updateinfo.py# UpdateInfo CLI command.## Copyright (C) 2014-2016 Red Hat, Inc.## This copyrighted material is made available to anyone wishing to use,# modify, copy, or redistribute it subject to the terms and conditions of# the GNU General Public License v.2, or (at your option) any later version.# This program is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY expressed or implied, including the implied warranties of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General# Public License for more details.  You should have received a copy of the# GNU General Public License along with this program; if not, write to the# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the# source code or documentation are not subject to the GNU General Public# License and may only be used or replicated with the express permission of# Red Hat, Inc.# """UpdateInfo CLI command."""from __future__ import absolute_importfrom __future__ import print_functionfrom __future__ import unicode_literals import collectionsimport fnmatch import hawkeyfrom dnf.cli import commandsfrom dnf.cli.option_parser import OptionParserfrom dnf.i18n import _, exact_widthfrom dnf.pycomp import unicode import sysimport json def _maxlen(iterable):    """Return maximum length of items in a non-empty iterable."""    return max(exact_width(item) for item in iterable)  class UpdateInfoCommand(commands.Command):    """Implementation of the UpdateInfo command."""     TYPE2LABEL = {hawkey.ADVISORY_BUGFIX: _('bugfix'),                  hawkey.ADVISORY_ENHANCEMENT: _('enhancement'),                  hawkey.ADVISORY_SECURITY: _('security'),                  hawkey.ADVISORY_UNKNOWN: _('unknown'),                  hawkey.ADVISORY_NEWPACKAGE: _('newpackage')}     SECURITY2LABEL = {'Critical': _('Critical/Sec.'),                      'Important': _('Important/Sec.'),                      'Moderate': _('Moderate/Sec.'),                      'Medium': _('Medium/Sec.'),                      'Low': _('Low/Sec.')}     MODERATES = {'Moderate', 'Medium'}     direct_commands = {'list-updateinfo'    : 'list',                       'list-security'      : 'list',                       'list-sec'           : 'list',                       'info-updateinfo'    : 'info',                       'info-security'      : 'info',                       'info-sec'           : 'info',                       'summary-updateinfo' : 'summary'}    aliases = ['updateinfo', 'upif'] + list(direct_commands.keys())    summary = _('display advisories about packages')    availability_default = 'available'    availabilities = ['installed', 'updates', 'all', availability_default]     def __init__(self, cli):        """Initialize the command."""        super(UpdateInfoCommand, self).__init__(cli)        self._installed_query = None     @staticmethod    def set_argparser(parser):        availability = parser.add_mutually_exclusive_group()        availability.add_argument(            "--available", dest='_availability', const='available', action='store_const',            help=_("advisories about newer versions of installed packages (default)"))        availability.add_argument(            "--installed", dest='_availability', const='installed', action='store_const',            help=_("advisories about equal and older versions of installed packages"))        availability.add_argument(            "--updates", dest='_availability', const='updates', action='store_const',            help=_("advisories about newer versions of those installed packages "                   "for which a newer version is available"))        availability.add_argument(            "--all", dest='_availability', const='all', action='store_const',            help=_("advisories about any versions of installed packages"))        cmds = ['summary', 'list', 'info']        output_format = parser.add_mutually_exclusive_group()        output_format.add_argument("--summary", dest='_spec_action', const='summary',                                   action='store_const',                                   help=_('show summary of advisories (default)'))        output_format.add_argument("--list", dest='_spec_action', const='list',                                   action='store_const',                                   help=_('show list of advisories'))        output_format.add_argument("--info", dest='_spec_action', const='info',                                   action='store_const',                                   help=_('show info of advisories'))        parser.add_argument("--with-cve", dest='with_cve', default=False,                            action='store_true',                            help=_('show only advisories with CVE reference'))        parser.add_argument("--with-bz", dest='with_bz', default=False,                            action='store_true',                            help=_('show only advisories with bugzilla reference'))        parser.add_argument("--json", dest='json', default=False,                            action='store_true',                            help=_('Display in JSON format.'))        parser.add_argument('spec', nargs='*', metavar='SPEC',                            choices=cmds, default=cmds[0],                            action=OptionParser.PkgNarrowCallback,                            help=_("Package specification"))     def configure(self):        """Do any command-specific configuration based on command arguments."""        self.cli.demands.available_repos = True        self.cli.demands.sack_activation = True         if self.opts.command in self.direct_commands:            # we were called with direct command            self.opts.spec_action = self.direct_commands[self.opts.command]        else:            if self.opts._spec_action:                self.opts.spec_action = self.opts._spec_action         if self.opts._availability:            self.opts.availability = self.opts._availability        else:            # yum compatibility - search for all|available|installed|updates in spec[0]            if not self.opts.spec or self.opts.spec[0] not in self.availabilities:                self.opts.availability = self.availability_default            else:                self.opts.availability = self.opts.spec.pop(0)         # filtering by advisory types (security/bugfix/enhancement/newpackage)        self.opts._advisory_types = set()        if self.opts.bugfix:            self.opts._advisory_types.add(hawkey.ADVISORY_BUGFIX)        if self.opts.enhancement:            self.opts._advisory_types.add(hawkey.ADVISORY_ENHANCEMENT)        if self.opts.newpackage:            self.opts._advisory_types.add(hawkey.ADVISORY_NEWPACKAGE)        if self.opts.security:            self.opts._advisory_types.add(hawkey.ADVISORY_SECURITY)         # yum compatibility - yum accepts types also as positional arguments        if self.opts.spec:            spec = self.opts.spec.pop(0)            if spec == 'bugfix':                self.opts._advisory_types.add(hawkey.ADVISORY_BUGFIX)            elif spec == 'enhancement':                self.opts._advisory_types.add(hawkey.ADVISORY_ENHANCEMENT)            elif spec in ('security', 'sec'):                self.opts._advisory_types.add(hawkey.ADVISORY_SECURITY)            elif spec == 'newpackage':                self.opts._advisory_types.add(hawkey.ADVISORY_NEWPACKAGE)            elif spec in ('bugzillas', 'bzs'):                self.opts.with_bz = True            elif spec == 'cves':                self.opts.with_cve = True            else:                self.opts.spec.insert(0, spec)         # Keep quiet when dumping JSON output        if self.opts.json:            self.cli.redirect_logger(stdout=sys.maxsize, stderr=sys.maxsize)         if self.opts.advisory:            self.opts.spec.extend(self.opts.advisory)      def run(self):        """Execute the command with arguments."""         # If any moderate-type severities are specified, then all        # moderate-type severities should be included in the results        if self.opts.severity and self.MODERATES.intersection(self.opts.severity):            self.opts.severity = list(self.MODERATES.union(self.opts.severity))         if self.opts.availability == 'installed':            apkg_adv_insts = self.installed_apkg_adv_insts(self.opts.spec)            description = _('installed')        elif self.opts.availability == 'updates':            apkg_adv_insts = self.updating_apkg_adv_insts(self.opts.spec)            description = _('updates')        elif self.opts.availability == 'all':            apkg_adv_insts = self.all_apkg_adv_insts(self.opts.spec)            description = _('all')        else:            apkg_adv_insts = self.available_apkg_adv_insts(self.opts.spec)            description = _('available')         if self.opts.spec_action == 'list':            self.display_list(apkg_adv_insts)        elif self.opts.spec_action == 'info':            self.display_info(apkg_adv_insts)        else:            self.display_summary(apkg_adv_insts, description)     def _newer_equal_installed(self, apackage):        if self._installed_query is None:            self._installed_query = self.base.sack.query().installed().apply()        q = self._installed_query.filter(name=apackage.name, evr__gte=apackage.evr)        return len(q) > 0     def _advisory_matcher(self, advisory):        if not self.opts._advisory_types \                and not self.opts.spec \                and not self.opts.severity \                and not self.opts.bugzilla \                and not self.opts.cves \                and not self.opts.with_cve \                and not self.opts.with_bz:            return True        if advisory.type in self.opts._advisory_types:            return True        if any(fnmatch.fnmatchcase(advisory.id, pat) for pat in self.opts.spec):            return True        if self.opts.severity and advisory.severity in self.opts.severity:            return True        if self.opts.bugzilla and any([advisory.match_bug(bug) for bug in self.opts.bugzilla]):            return True        if self.opts.cves and any([advisory.match_cve(cve) for cve in self.opts.cves]):            return True        if self.opts.with_cve:            if any([ref.type == hawkey.REFERENCE_CVE for ref in advisory.references]):                return True        if self.opts.with_bz:            if any([ref.type == hawkey.REFERENCE_BUGZILLA for ref in advisory.references]):                return True        return False     def _apackage_advisory_installed(self, pkgs_query, cmptype, specs):        """Return (adv. package, advisory, installed) triplets."""        for apackage in pkgs_query.get_advisory_pkgs(cmptype):            advisory = apackage.get_advisory(self.base.sack)            advisory_match = self._advisory_matcher(advisory)            apackage_match = any(fnmatch.fnmatchcase(apackage.name, pat)                                 for pat in self.opts.spec)            if advisory_match or apackage_match:                installed = self._newer_equal_installed(apackage)                yield apackage, advisory, installed     def running_kernel_pkgs(self):        """Return query containing packages of currently running kernel"""        sack = self.base.sack        q = sack.query().filterm(empty=True)        kernel = sack.get_running_kernel()        if kernel:            q = q.union(sack.query().filterm(sourcerpm=kernel.sourcerpm))        return q     def available_apkg_adv_insts(self, specs):        """Return available (adv. package, adv., inst.) triplets"""        # check advisories for the latest installed packages        q = self.base.sack.query().installed().latest(1)        # plus packages of the running kernel        q = q.union(self.running_kernel_pkgs().installed())        return self._apackage_advisory_installed(q, hawkey.GT, specs)     def installed_apkg_adv_insts(self, specs):        """Return installed (adv. package, adv., inst.) triplets"""        return self._apackage_advisory_installed(            self.base.sack.query().installed(), hawkey.LT | hawkey.EQ, specs)     def updating_apkg_adv_insts(self, specs):        """Return updating (adv. package, adv., inst.) triplets"""        return self._apackage_advisory_installed(            self.base.sack.query().filterm(upgradable=True), hawkey.GT, specs)     def all_apkg_adv_insts(self, specs):        """Return installed (adv. package, adv., inst.) triplets"""        return self._apackage_advisory_installed(            self.base.sack.query().installed(), hawkey.LT | hawkey.EQ | hawkey.GT, specs)     def _summary(self, apkg_adv_insts):        """Make the summary of advisories."""        # Remove duplicate advisory IDs. We assume that the ID is unique within        # a repository and two advisories with the same IDs in different        # repositories must have the same type.        id2type = {}        for (apkg, advisory, installed) in apkg_adv_insts:            id2type[advisory.id] = advisory.type            if advisory.type == hawkey.ADVISORY_SECURITY:                id2type[(advisory.id, advisory.severity)] = (advisory.type, advisory.severity)        return collections.Counter(id2type.values())     def display_summary(self, apkg_adv_insts, description):        """Display the summary of advisories."""        typ2cnt = self._summary(apkg_adv_insts)        if typ2cnt:            print(_('Updates Information Summary: ') + description)            # Convert types to strings and order the entries.            label_counts = [                (0, _('New Package notice(s)'), typ2cnt[hawkey.ADVISORY_NEWPACKAGE]),                (0, _('Security notice(s)'), typ2cnt[hawkey.ADVISORY_SECURITY]),                (1, _('Critical Security notice(s)'),                 typ2cnt[(hawkey.ADVISORY_SECURITY, 'Critical')]),                (1, _('Important Security notice(s)'),                 typ2cnt[(hawkey.ADVISORY_SECURITY, 'Important')]),                (1, _('Moderate Security notice(s)'),                 typ2cnt[(hawkey.ADVISORY_SECURITY, 'Moderate')]),                (1, _('Medium Security notice(s)'),                 typ2cnt[(hawkey.ADVISORY_SECURITY, 'Medium')]),                (1, _('Low Security notice(s)'),                 typ2cnt[(hawkey.ADVISORY_SECURITY, 'Low')]),                (1, _('Unknown Security notice(s)'),                 typ2cnt[(hawkey.ADVISORY_SECURITY, None)]),                (0, _('Bugfix notice(s)'), typ2cnt[hawkey.ADVISORY_BUGFIX]),                (0, _('Enhancement notice(s)'), typ2cnt[hawkey.ADVISORY_ENHANCEMENT]),                (0, _('other notice(s)'), typ2cnt[hawkey.ADVISORY_UNKNOWN])]            width = _maxlen(unicode(v[2]) for v in label_counts if v[2])            for indent, label, count in label_counts:                if not count:                    continue                print('    %*s %s' % (width + 4 * indent, unicode(count), label))        if self.base.conf.autocheck_running_kernel:            self.cli._check_running_kernel()     def display_list(self, apkg_adv_insts):        """Display the list of advisories."""        def inst2mark(inst):            if not self.opts.availability == 'all':                return ''            elif inst:                return 'i '            else:                return '  '         def type2label(typ, sev):            if typ == hawkey.ADVISORY_SECURITY:                return self.SECURITY2LABEL.get(sev, _('Unknown/Sec.'))            else:                return self.TYPE2LABEL.get(typ, _('unknown'))         nevra_inst_dict = dict()        for apkg, advisory, installed in apkg_adv_insts:            nevra = '%s-%s.%s' % (apkg.name, apkg.evr, apkg.arch)            if self.opts.with_cve or self.opts.with_bz:                for ref in advisory.references:                    if ref.type == hawkey.REFERENCE_BUGZILLA and not self.opts.with_bz:                        continue                    elif ref.type == hawkey.REFERENCE_CVE and not self.opts.with_cve:                        continue                    nevra_inst_dict.setdefault((nevra, installed, advisory.updated), dict())[ref.id] = (                        advisory.type, advisory.severity)            else:                nevra_inst_dict.setdefault((nevra, installed, advisory.updated), dict())[advisory.id] = (                    advisory.type, advisory.severity)         advlist = []        # convert types to labels, find max len of advisory IDs and types        idw = tlw = nw = 0        for (nevra, inst, aupdated), id2type in sorted(nevra_inst_dict.items(), key=lambda x: x[0]):            nw = max(nw, len(nevra))            for aid, atypesev in id2type.items():                idw = max(idw, len(aid))                typ, sev = atypesev                label = type2label(*atypesev)                # use dnf5 style for JSON output                atype = self.TYPE2LABEL.get(typ, _('unspecified'))                asev = self.SECURITY2LABEL.get(sev, _('None'))                asev = asev.split("/")[0].strip()                tlw = max(tlw, len(label))                advlist.append((inst2mark(inst), aid, label, atype, asev, nevra, aupdated))        if self.opts.json:            dtlst = []            for (inst, aid, label, atype, asev, nevra, aupdated) in advlist:                dtlst.append(                    {                        "name": aid,                        "type": atype,                        "severity": asev,                        "nevra": nevra,                        "buildtime": aupdated,                    }                )            print(json.dumps(dtlst, default=str, indent=2))        else:            for (inst, aid, label, atype, asev, nevra, aupdated) in advlist:                if self.base.conf.verbose:                    print('%s%-*s %-*s %-*s %s' % (inst, idw, aid, tlw, label, nw, nevra, aupdated))                else:                    print('%s%-*s %-*s %s' % (inst, idw, aid, tlw, label, nevra))     def _process_advisory(self, advisory):        """Convert DNF advisory object directly to desired format."""        advisory_id = getattr(advisory, 'id', None)         package_list = []        for pkg in getattr(advisory, 'packages', []):            if not getattr(pkg, 'name', None):                continue            pkg_info = {                'name': getattr(pkg, 'name', None),                'evr': getattr(pkg, 'evr', None),                'arch': getattr(pkg, 'arch', None),            }            pkg_str = f"{pkg_info.get('name')}-{pkg_info.get('evr')}"            if pkg_info.get('arch'):                pkg_str += f".{pkg_info.get('arch')}"            package_list.append(pkg_str)         REFERENCE_TYPES = {0: 'unknown', 1: 'bugzilla', 2: 'cve', 3: 'vendor', 4: 'security'}        references = []        for ref in getattr(advisory, 'references', []):            ref_dict = {                'Title': getattr(ref, 'title', None),                'Id': getattr(ref, 'id', None),                'Type': REFERENCE_TYPES.get(getattr(ref, 'type', 0), 'unknown'),                'Url': getattr(ref, 'href', None) or getattr(ref, 'url', None) or getattr(ref, 'link', None)            }            ref_dict = {k: v for k, v in ref_dict.items() if v is not None}            references.append(ref_dict)         result = {            advisory_id: {                'Name': advisory_id,                'Title': getattr(advisory, 'title', None),                'Severity': getattr(advisory, 'severity', 'None'),                'Type': self.TYPE2LABEL.get(getattr(advisory, 'type'), _('unspecified')),                'Status': getattr(advisory, 'status', None),                'Vendor': (getattr(advisory, 'vendor', None) \                           or getattr(advisory, 'author', None) \                           or getattr(advisory, 'from', None)),                'Issued': getattr(advisory, 'updated', '').strftime("%Y-%m-%d %H:%M:%S")                if getattr(advisory, 'updated', None)                else None,                'Description': getattr(advisory, 'description', None),                'Message': '',                'Rights': getattr(advisory, 'rights', None),                'references': references,                'collections': {                    'packages': package_list                }            }        }         return result      def display_info(self, apkg_adv_insts):        """Display the details about available advisories."""        arches = self.base.sack.list_arches()        verbose = self.base.conf.verbose        labels = (_('Update ID'), _('Type'), _('Updated'), _('Bugs'),                  _('CVEs'), _('Description'), _('Severity'), _('Rights'),                  _('Files'), _('Installed'))         def advisory2info(advisory, installed):            attributes = [                [advisory.id],                [self.TYPE2LABEL.get(advisory.type, _('unknown'))],                [unicode(advisory.updated)],                [],                [],                (advisory.description or '').splitlines(),                [advisory.severity],                (advisory.rights or '').splitlines(),                sorted(set(pkg.filename for pkg in advisory.packages                           if pkg.arch in arches)),                None]            for ref in advisory.references:                if ref.type == hawkey.REFERENCE_BUGZILLA:                    attributes[3].append('{} - {}'.format(ref.id, ref.title or ''))                elif ref.type == hawkey.REFERENCE_CVE:                    attributes[4].append(ref.id)            attributes[3].sort()            attributes[4].sort()            if not verbose:                attributes[7] = None                attributes[8] = None            if self.opts.availability == 'all':                attributes[9] = [_('true') if installed else _('false')]             width = _maxlen(labels)            lines = []            lines.append('=' * 79)            lines.append('  ' + advisory.title)            lines.append('=' * 79)            for label, atr_lines in zip(labels, attributes):                if atr_lines in (None, [None]):                    continue                for i, line in enumerate(atr_lines):                    key = label if i == 0 else ''                    key_padding = width - exact_width(key)                    lines.append('%*s%s: %s' % (key_padding, "", key, line))            return '\n'.join(lines)         dt_advisories = {}        advisories = set()        for apkg, advisory, installed in apkg_adv_insts:            dt_advisories.update(self._process_advisory(advisory))            formatted_attributes = advisory2info(advisory, installed)            advisories.add(formatted_attributes)         if self.opts.json:            print(json.dumps(dt_advisories, default=str, indent=2))        else:            print("\n\n".join(sorted(advisories, key=lambda x: x.lower())))