From c791bdf5c051bb63e47457fdc0dca612412f9bf5 Mon Sep 17 00:00:00 2001 From: wang-guangge Date: Fri, 24 Mar 2023 22:56:26 +0800 Subject: [PATCH] add dnf hot patch list plugin --- hotpatch/baseclass.py | 191 +++++++++++++++++++ hotpatch/hotpatch.py | 164 ++++++++++++++++ hotpatch/hotpatch_updateinfo.py | 322 ++++++++++++++++++++++++++++++++ 3 files changed, 677 insertions(+) create mode 100644 hotpatch/baseclass.py create mode 100644 hotpatch/hotpatch.py create mode 100644 hotpatch/hotpatch_updateinfo.py diff --git a/hotpatch/baseclass.py b/hotpatch/baseclass.py new file mode 100644 index 0000000..9793c40 --- /dev/null +++ b/hotpatch/baseclass.py @@ -0,0 +1,191 @@ +class Hotpatch(object): + __slots__ = ['_name', '_version', '_cves', + '_advisory', '_arch', '_filename', '_state'] + + def __init__(self, + name, + version, + arch, + filename, + release=''): + """ + name: str + version: str + arch: str + filename: str + release: str + """ + self._name = name + self._version = version + self._arch = arch + self._filename = filename + self._cves = [] + self._advisory = None + self._state = '' + + @property + def state(self): + return self._state + + @state.setter + def state(self, value): + self._state = value + + @property + def name(self): + return self._name + + @property + def version(self): + return self._version + + @property + def src_pkg_nevre(self): + src_pkg = self.name[self.name.index('-')+1:self.name.rindex('-')] + src_pkg = src_pkg.split('-') + src_pkg_name, src_pkg_version, src_pkg_release = src_pkg[0], src_pkg[1], src_pkg[2] + return src_pkg_name, src_pkg_version, src_pkg_release + + @property + def nevra(self): + """ + nevra: name-version-release.arch + """ + return self.filename[0:self.filename.rindex('.')] + + @property + def hotpatch_name(self): + hotpatch_name = self.name[self.name.rindex('-')+1:] + return hotpatch_name + + @property + def syscare_name(self): + src_pkg = '%s-%s-%s' % (self.src_pkg_nevre) + return '%s/%s' % (src_pkg, self.hotpatch_name) + + @property + def cves(self): + return self._cves + + @cves.setter + def cves(self, cves): + self._cves = cves + + @property + def advisory(self): + return self._advisory + + @advisory.setter + def advisory(self, advisory): + self._advisory = advisory + + @property + def arch(self): + return self._arch + + @property + def filename(self): + return self._filename + + +class Cve(object): + __slots__ = ['_cve_id', '_hotpatch'] + + def __init__(self, + id, + href='', + title='', + type='cve'): + """ + id: str + href: str + title: str + type: str + """ + self._cve_id = id + self._hotpatch = None + + @property + def hotpatch(self): + return self._hotpatch + + @hotpatch.setter + def hotpatch(self, hotpatch: Hotpatch): + self._hotpatch = hotpatch + + @property + def cve_id(self): + return self._cve_id + + +class Advisory(object): + __slots__ = ['_id', '_type', '_title', '_severity', + '_description', '_updated', '_hotpatches', '_cves'] + + def __init__(self, + id, + type, + title, + severity, + description, + updated="1970-01-01 08:00:00", + release="", + issued=""): + """ + id: str + type: str + title: str + severity: str + description: str + updated: str + release: str + issued: str + """ + self._id = id + self._type = type + self._title = title + self._severity = severity + self._description = description + self._updated = updated + self._cves = {} + self._hotpatches = [] + + @property + def id(self): + return self._id + + @property + def type(self): + return self._type + + @property + def title(self): + return self._title + + @property + def severity(self): + return self._severity + + @property + def description(self): + return self._description + + @property + def updated(self): + return self._updated + + @property + def cves(self): + return self._cves + + @cves.setter + def cves(self, advisory_cves): + self._cves = advisory_cves + + @property + def hotpatches(self): + return self._hotpatches + + def add_hotpatch(self, hotpatch: Hotpatch): + self._hotpatches.append(hotpatch) + diff --git a/hotpatch/hotpatch.py b/hotpatch/hotpatch.py new file mode 100644 index 0000000..a3ad7bb --- /dev/null +++ b/hotpatch/hotpatch.py @@ -0,0 +1,164 @@ +import dnf +from dnf.i18n import _ +from dnf.cli.commands.updateinfo import UpdateInfoCommand +import hawkey +from .hotpatch_updateinfo import HotpatchUpdateInfo + +@dnf.plugin.register_command +class HotpatchCommand(dnf.cli.Command): + aliases = ['hotpatch'] + summary = _('show hotpatch info') + + + def __init__(self, cli): + """ + Initialize the command + """ + super(HotpatchCommand, self).__init__(cli) + + @staticmethod + def set_argparser(parser): + output_format = parser.add_mutually_exclusive_group() + output_format.add_argument("--list", dest='_spec_action', const='list', + action='store_const', + help=_('show list of cves')) + + def configure(self): + demands = self.cli.demands + demands.sack_activation = True + demands.available_repos = True + + self.filter_cves = self.opts.cves if self.opts.cves else None + + + def run(self): + self.hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) + + if self.opts._spec_action == 'list': + self.display() + + + def get_mapping_nevra_cve(self) -> dict: + """ + Get cve nevra mapping based on the UpdateInfoCommand of 'dnf updateinfo list cves' + + Returns: + { + (nevra, advisory.updated): + cve_id: { + (advisory.type, advisory.severity), + ... + } + ... + } + """ + # configure UpdateInfoCommand with 'dnf updateinfo list cves' + updateinfo = UpdateInfoCommand(self.cli) + updateinfo.opts = self.opts + + updateinfo.opts.spec_action = 'list' + updateinfo.opts.with_cve = True + updateinfo.opts.spec = '*' + updateinfo.opts._advisory_types = set() + updateinfo.opts.availability = 'available' + self.updateinfo = updateinfo + + apkg_adv_insts = updateinfo.available_apkg_adv_insts(updateinfo.opts.spec) + + mapping_nevra_cve = dict() + for apkg, advisory, _ in apkg_adv_insts: + nevra = (apkg.name, apkg.evr, apkg.arch) + for ref in advisory.references: + if ref.type != hawkey.REFERENCE_CVE: + continue + mapping_nevra_cve.setdefault((nevra, advisory.updated), dict())[ref.id] = (advisory.type, advisory.severity) + + return mapping_nevra_cve + + + def _filter_and_format_list_output(self, echo_lines: list, fixed_cve_id: set, fixed_coldpatches: set): + """ + Only show specified cve information that have not been fixed, and format output + """ + def is_patch_fixed(coldpatch, fixed_coldpatches): + """ + Check whether the coldpatch is fixed + """ + for fixed_coldpatch in fixed_coldpatches: + pkg_name, pkg_evr, _ = coldpatch + fixed_pkg_name, fixed_pkg_evr, _ = fixed_coldpatch + if pkg_name != fixed_pkg_name: + continue + version = Versions() + if version.lgt(fixed_pkg_evr, pkg_evr): + return True + return False + + + idw = tiw = ciw = 0 + format_lines = set() + for echo_line in echo_lines: + cve_id, type, coldpatch, hotpatch = echo_line[0], echo_line[1], echo_line[2], echo_line[3] + if self.filter_cves is not None and cve_id not in self.filter_cves: + continue + if cve_id in fixed_cve_id: + continue + if not isinstance(coldpatch, str): + if is_patch_fixed(coldpatch, fixed_coldpatches): + continue + else: + pkg_name, pkg_evr, pkg_arch = coldpatch + coldpatch = '%s-%s.%s' % (pkg_name, pkg_evr, pkg_arch) + + idw = max(idw, len(cve_id)) + tiw = max(tiw, len(type)) + ciw = max(ciw, len(coldpatch)) + format_lines.add((cve_id, type, coldpatch, hotpatch)) + for format_line in sorted(format_lines, key = lambda x: x[2]): + print('%-*s %-*s %-*s %s' % (idw, format_line[0], tiw, format_line[1], ciw, format_line[2], format_line[3])) + + + def display(self): + """ + Append hotpatch information according to the output of 'dnf updateinfo list cves' + + echo lines: + [ + [cve_id, type, coldpatch, hotpatch] + ] + """ + + def type2label(updateinfo, typ, sev): + if typ == hawkey.ADVISORY_SECURITY: + return updateinfo.SECURITY2LABEL.get(sev, _('Unknown/Sec.')) + else: + return updateinfo.TYPE2LABEL.get(typ, _('unknown')) + + + mapping_nevra_cve = self.get_mapping_nevra_cve() + echo_lines = [] + fixed_cve_id = set() + fixed_coldpatches = set() + iterated_cve_id = set() + for ((nevra), aupdated), id2type in sorted(mapping_nevra_cve.items(), key=lambda x: x[0]): + pkg_name, pkg_evr, pkg_arch = nevra + for cve_id, atypesev in id2type.items(): + iterated_cve_id.add(cve_id) + label = type2label(self.updateinfo, *atypesev) + echo_line = [cve_id, label, nevra, '-'] + if cve_id in self.hp_hawkey.hotpatch_cves: + hotpatch = self.hp_hawkey.hotpatch_cves[cve_id].hotpatch + if hotpatch is not None and hotpatch.src_pkg_nevre[0] == pkg_name: + if hotpatch.state == self.hp_hawkey.INSTALLED : + # record the fixed cves + for cve_id in hotpatch.cves: + fixed_cve_id.add(cve_id) + # record the fixed coldpatch to filter the cves of the corresponding coldpatch with the lower version + fixed_coldpatches.add((nevra)) + continue + elif hotpatch.state == self.hp_hawkey.INSTALLABLE: + echo_line[3] = hotpatch.nevra + + echo_lines.append(echo_line) + + self._filter_and_format_list_output(echo_lines, fixed_cve_id, fixed_coldpatches) diff --git a/hotpatch/hotpatch_updateinfo.py b/hotpatch/hotpatch_updateinfo.py new file mode 100644 index 0000000..bf04948 --- /dev/null +++ b/hotpatch/hotpatch_updateinfo.py @@ -0,0 +1,322 @@ +from .baseclass import Hotpatch, Cve, Advisory +from .syscare import Syscare +import os +from typing import Optional +import gzip +import xml.etree.ElementTree as ET +import datetime + +class HotpatchUpdateInfo(object): + """ + Hotpatch relevant updateinfo processing + """ + + UNINSTALLABLE = 0 + INSTALLED = 1 + INSTALLABLE = 2 + + def __init__(self, base, cli): + self.base = base + self.cli = cli + # dict {advisory_id: Advisory} + self._hotpatch_advisories = {} + # dict {cve_id: Cve} + self._hotpatch_cves = {} + # list [{'Uuid': uuid, 'Name':name, 'Status': status}] + self._hotpatch_status = [] + + self.init_hotpatch_info() + + def init_hotpatch_info(self): + """ + Initialize hotpatch information + """ + self._get_installed_pkgs() + self._parse_and_store_hotpatch_info_from_updateinfo() + self._init_hotpatch_status_from_syscare() + self._init_hotpatch_state() + + @property + def hotpatch_cves(self): + return self._hotpatch_cves + + @property + def hotpatch_status(self): + return self._hotpatch_status + + def _get_installed_pkgs(self): + """ + Get installed packages by setting the hawkey + """ + sack = self.base.sack + # the latest installed packages + q = sack.query().installed().latest(1) + # plus packages of the running kernel + kernel_q = sack.query().filterm(empty=True) + kernel = sack.get_running_kernel() + if kernel: + kernel_q = kernel_q.union( + sack.query().filterm(sourcerpm=kernel.sourcerpm)) + q = q.union(kernel_q.installed()) + q = q.apply() + + self._inst_pkgs_query = q + + def _parse_and_store_hotpatch_info_from_updateinfo(self): + """ + Initialize hotpatch information from repos + """ + # get xxx-hotpatch.xml.gz file paths by traversing the system_cachedir(/var/cache/dnf) + system_cachedir = self.cli.base.conf.system_cachedir + all_repos = self.cli.base.repos + map_repo_updateinfoxml = {} + + for file in os.listdir(system_cachedir): + file_path = os.path.join(system_cachedir, file) + if os.path.isdir(file_path): + repodata_path = os.path.join(file_path, "repodata") + if not os.path.isdir(repodata_path): + continue + + for xml_file in os.listdir(repodata_path): + # the hotpatch relevant updateinfo is recorded in xxx-hotpatch.xml.gz + if "hotpatch" in xml_file: + repo_name = file.split("-")[0] + cache_updateinfo_xml_path = os.path.join( + repodata_path, xml_file) + map_repo_updateinfoxml[repo_name] = cache_updateinfo_xml_path + + # only hotpatch relevant updateinfo from enabled repos are parsed and stored + for repo in all_repos.iter_enabled(): + repo_id = repo.id + if repo_id in map_repo_updateinfoxml: + updateinfoxml_path = map_repo_updateinfoxml[repo_id] + self._parse_and_store_from_xml(updateinfoxml_path) + + def _parse_pkglist(self, pkglist): + """ + Parse the pkglist information, filter the hotpatches with different arches + """ + hotpatches = [] + hot_patch_collection = pkglist.find('collection') + arches = self.base.sack.list_arches() + if not hot_patch_collection: + return hotpatches + for package in hot_patch_collection.iter('package'): + hotpatch = {key: value for key, value in package.items()} + if hotpatch['arch'] not in arches: + continue + hotpatch['filename'] = package.find('filename').text + hotpatches.append(hotpatch) + return hotpatches + + def _parse_references(self, reference): + """ + Parse the reference information, check whether the 'id' is missing + """ + cves = [] + for ref in reference: + cve = {key: value for key, value in ref.items()} + if 'id' not in cve: + continue + cves.append(cve) + return cves + + def _verify_date_str_lawyer(self, datetime_str: str) -> str: + """ + Check whether the 'datetime' field is legal, if not return default value + """ + if datetime_str.isdigit() and len(datetime_str) == 10: + datetime_str = int(datetime_str) + datetime_str = datetime.datetime.fromtimestamp( + datetime_str).strftime("%Y-%m-%d %H:%M:%S") + try: + datetime.datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') + return datetime_str + except ValueError: + return "1970-01-01 08:00:00" + + def _parse_advisory(self, update): + """ + Parse the advisory information: check whether the 'datetime' field is legal, parse the 'references' + field and the 'pkglist' field, save 'type' information + """ + advisory = {} + for node in update: + if node.tag == 'datetime': + advisory[node.tag] = self._verify_date_str_lawyer( + update.find(node.tag).text) + elif node.tag == 'references': + advisory[node.tag] = self._parse_references(node) + elif node.tag == 'pkglist': + advisory['hotpatches'] = self._parse_pkglist(node) + else: + advisory[node.tag] = update.find(node.tag).text + advisory['type'] = update.get('type') + return advisory + + def _store_advisory_info(self, advisory_kwargs: dict()): + """ + Instantiate Cve, Hotpatch and Advisory object according to the advisory kwargs + """ + advisory_references = advisory_kwargs.pop('references') + advisory_hotpatches = advisory_kwargs.pop('hotpatches') + advisory = Advisory(**advisory_kwargs) + advisory_cves = {} + for cve_kwargs in advisory_references: + cve = Cve(**cve_kwargs) + self._hotpatch_cves[cve.cve_id] = cve + advisory_cves[cve.cve_id] = cve + advisory.cves = advisory_cves + + for hotpatch_kwargs in advisory_hotpatches: + hotpatch = Hotpatch(**hotpatch_kwargs) + hotpatch.advisory = advisory + hotpatch.cves = advisory_cves.keys() + + advisory.add_hotpatch(hotpatch) + + for cve in advisory_cves.values(): + cve.hotpatch = hotpatch + + self._hotpatch_advisories.setdefault( + advisory_kwargs['id'], list()).append(advisory) + + def _init_hotpatch_state(self): + """ + Initialize the hotpatch state + + each hotpatch has three states: + 1. UNINSTALLABLE: can not be installed due to the source package version mismatch + 2. INSTALLED: has been installed and actived in syscare + 3. INSTALLABLE: can be installed + + """ + for advisories in self._hotpatch_advisories.values(): + for advisory in advisories: + for hotpatch in advisory.hotpatches: + src_pkg_name, src_pkg_version, src_pkg_release = hotpatch.src_pkg_nevre + inst_pkgs = self._inst_pkgs_query.filter(name=src_pkg_name) + hotpatch.state = self.UNINSTALLABLE + # check whether the relevant source package is installed on this machine + if not inst_pkgs: + continue + for inst_pkg in inst_pkgs: + inst_pkg_vere = '%s-%s' % (inst_pkg.version, + inst_pkg.release) + hp_vere = '%s-%s' % (src_pkg_version, src_pkg_release) + if hp_vere != inst_pkg_vere: + continue + elif self._get_hotpatch_status_in_syscare(hotpatch) == 'ACTIVED': + hotpatch.state = self.INSTALLED + else: + hotpatch.state = self.INSTALLABLE + + def _parse_and_store_from_xml(self, updateinfoxml): + """ + Parse and store hotpatch update information from xxx-hotpatch.xml.gz + + xxx-hotpatch.xml.gz e.g. + + + + + openEuler-SA-2022-1 + An update for mariadb is now available for openEuler-22.03-LTS + Important + openEuler + + + + + patch-redis-6.2.5-1-HP001.(CVE-2022-24048) + + + openEuler + + patch-redis-6.2.5-1-HP001-0-1.aarch64.rpm + + + patch-redis-6.2.5-1-HP001-0-1.x86_64.rpm + + + + + ... + + """ + content = gzip.open(updateinfoxml) + tree = ET.parse(content) + root = tree.getroot() + for update in root.iter('update'): + advisory = self._parse_advisory(update) + self._store_advisory_info(advisory) + + def _init_hotpatch_status_from_syscare(self): + """ + Initialize hotpatch status from syscare + """ + self._hotpatch_status = Syscare().list() + + self._hotpatch_state = {} + for hotpatch_info in self._hotpatch_status: + self._hotpatch_state[hotpatch_info['Name'] + ] = hotpatch_info['Status'] + + def _get_hotpatch_status_in_syscare(self, hotpatch: Hotpatch) -> str: + """ + Get hotpatch status in syscare + """ + if hotpatch.syscare_name not in self._hotpatch_state: + return '' + return self._hotpatch_state[hotpatch.syscare_name] + + def get_hotpatches_from_cve(self, cves: Optional[list[str]] = []) -> dict(): + """ + Get hotpatches from specified cve + + Args: + cves: [cve_id_1, cve_id_2] + + Returns: + { + cve_id_1: [hotpatch1], + cve_id_2: [] + } + """ + mapping_cve_hotpatches = dict() + for cve_id in cves: + mapping_cve_hotpatches[cve_id] = [] + if cve_id not in self.hotpatch_cves: + continue + hotpatch = self.hotpatch_cves[cve_id].hotpatch + if hotpatch is not None and hotpatch.state == self.INSTALLABLE: + mapping_cve_hotpatches[cve_id].append(hotpatch.nevra) + return mapping_cve_hotpatches + + def get_hotpatches_from_advisories(self, advisories: Optional[list[str]] = []) -> dict(): + """ + Get hotpatches from specified advisories + + Args: + advisories: [advisory_id_1, advisory_id_2] + + Return: + { + advisory_id_1: [hotpatch1], + advisory_id_2: [] + } + """ + mapping_advisory_hotpatches = dict() + for advisory_id in advisories: + mapping_advisory_hotpatches[advisory_id] = [] + if advisory_id not in self._hotpatch_advisories: + continue + for advisory in self._hotpatch_advisories[advisory_id]: + for hotpatch in advisory.hotpatches: + if hotpatch.state == self.INSTALLABLE: + mapping_advisory_hotpatches[advisory_id].append( + hotpatch.nevra) + return mapping_advisory_hotpatches + + -- 2.33.0