713 lines
24 KiB
Diff
713 lines
24 KiB
Diff
From c791bdf5c051bb63e47457fdc0dca612412f9bf5 Mon Sep 17 00:00:00 2001
|
|
From: wang-guangge <wangguangge@huawei.com>
|
|
Date: Fri, 24 Mar 2023 22:56:26 +0800
|
|
Subject: [PATCH] add dnf hot patch list plugin
|
|
|
|
---
|
|
hotpatch/baseclass.py | 191 +++++++++++++++++++
|
|
hotpatch/hotpatch.py | 164 ++++++++++++++++
|
|
hotpatch/hotpatch_updateinfo.py | 322 ++++++++++++++++++++++++++++++++
|
|
3 files changed, 677 insertions(+)
|
|
create mode 100644 hotpatch/baseclass.py
|
|
create mode 100644 hotpatch/hotpatch.py
|
|
create mode 100644 hotpatch/hotpatch_updateinfo.py
|
|
|
|
diff --git a/hotpatch/baseclass.py b/hotpatch/baseclass.py
|
|
new file mode 100644
|
|
index 0000000..9793c40
|
|
--- /dev/null
|
|
+++ b/hotpatch/baseclass.py
|
|
@@ -0,0 +1,191 @@
|
|
+class Hotpatch(object):
|
|
+ __slots__ = ['_name', '_version', '_cves',
|
|
+ '_advisory', '_arch', '_filename', '_state']
|
|
+
|
|
+ def __init__(self,
|
|
+ name,
|
|
+ version,
|
|
+ arch,
|
|
+ filename,
|
|
+ release=''):
|
|
+ """
|
|
+ name: str
|
|
+ version: str
|
|
+ arch: str
|
|
+ filename: str
|
|
+ release: str
|
|
+ """
|
|
+ self._name = name
|
|
+ self._version = version
|
|
+ self._arch = arch
|
|
+ self._filename = filename
|
|
+ self._cves = []
|
|
+ self._advisory = None
|
|
+ self._state = ''
|
|
+
|
|
+ @property
|
|
+ def state(self):
|
|
+ return self._state
|
|
+
|
|
+ @state.setter
|
|
+ def state(self, value):
|
|
+ self._state = value
|
|
+
|
|
+ @property
|
|
+ def name(self):
|
|
+ return self._name
|
|
+
|
|
+ @property
|
|
+ def version(self):
|
|
+ return self._version
|
|
+
|
|
+ @property
|
|
+ def src_pkg_nevre(self):
|
|
+ src_pkg = self.name[self.name.index('-')+1:self.name.rindex('-')]
|
|
+ src_pkg = src_pkg.split('-')
|
|
+ src_pkg_name, src_pkg_version, src_pkg_release = src_pkg[0], src_pkg[1], src_pkg[2]
|
|
+ return src_pkg_name, src_pkg_version, src_pkg_release
|
|
+
|
|
+ @property
|
|
+ def nevra(self):
|
|
+ """
|
|
+ nevra: name-version-release.arch
|
|
+ """
|
|
+ return self.filename[0:self.filename.rindex('.')]
|
|
+
|
|
+ @property
|
|
+ def hotpatch_name(self):
|
|
+ hotpatch_name = self.name[self.name.rindex('-')+1:]
|
|
+ return hotpatch_name
|
|
+
|
|
+ @property
|
|
+ def syscare_name(self):
|
|
+ src_pkg = '%s-%s-%s' % (self.src_pkg_nevre)
|
|
+ return '%s/%s' % (src_pkg, self.hotpatch_name)
|
|
+
|
|
+ @property
|
|
+ def cves(self):
|
|
+ return self._cves
|
|
+
|
|
+ @cves.setter
|
|
+ def cves(self, cves):
|
|
+ self._cves = cves
|
|
+
|
|
+ @property
|
|
+ def advisory(self):
|
|
+ return self._advisory
|
|
+
|
|
+ @advisory.setter
|
|
+ def advisory(self, advisory):
|
|
+ self._advisory = advisory
|
|
+
|
|
+ @property
|
|
+ def arch(self):
|
|
+ return self._arch
|
|
+
|
|
+ @property
|
|
+ def filename(self):
|
|
+ return self._filename
|
|
+
|
|
+
|
|
+class Cve(object):
|
|
+ __slots__ = ['_cve_id', '_hotpatch']
|
|
+
|
|
+ def __init__(self,
|
|
+ id,
|
|
+ href='',
|
|
+ title='',
|
|
+ type='cve'):
|
|
+ """
|
|
+ id: str
|
|
+ href: str
|
|
+ title: str
|
|
+ type: str
|
|
+ """
|
|
+ self._cve_id = id
|
|
+ self._hotpatch = None
|
|
+
|
|
+ @property
|
|
+ def hotpatch(self):
|
|
+ return self._hotpatch
|
|
+
|
|
+ @hotpatch.setter
|
|
+ def hotpatch(self, hotpatch: Hotpatch):
|
|
+ self._hotpatch = hotpatch
|
|
+
|
|
+ @property
|
|
+ def cve_id(self):
|
|
+ return self._cve_id
|
|
+
|
|
+
|
|
+class Advisory(object):
|
|
+ __slots__ = ['_id', '_type', '_title', '_severity',
|
|
+ '_description', '_updated', '_hotpatches', '_cves']
|
|
+
|
|
+ def __init__(self,
|
|
+ id,
|
|
+ type,
|
|
+ title,
|
|
+ severity,
|
|
+ description,
|
|
+ updated="1970-01-01 08:00:00",
|
|
+ release="",
|
|
+ issued=""):
|
|
+ """
|
|
+ id: str
|
|
+ type: str
|
|
+ title: str
|
|
+ severity: str
|
|
+ description: str
|
|
+ updated: str
|
|
+ release: str
|
|
+ issued: str
|
|
+ """
|
|
+ self._id = id
|
|
+ self._type = type
|
|
+ self._title = title
|
|
+ self._severity = severity
|
|
+ self._description = description
|
|
+ self._updated = updated
|
|
+ self._cves = {}
|
|
+ self._hotpatches = []
|
|
+
|
|
+ @property
|
|
+ def id(self):
|
|
+ return self._id
|
|
+
|
|
+ @property
|
|
+ def type(self):
|
|
+ return self._type
|
|
+
|
|
+ @property
|
|
+ def title(self):
|
|
+ return self._title
|
|
+
|
|
+ @property
|
|
+ def severity(self):
|
|
+ return self._severity
|
|
+
|
|
+ @property
|
|
+ def description(self):
|
|
+ return self._description
|
|
+
|
|
+ @property
|
|
+ def updated(self):
|
|
+ return self._updated
|
|
+
|
|
+ @property
|
|
+ def cves(self):
|
|
+ return self._cves
|
|
+
|
|
+ @cves.setter
|
|
+ def cves(self, advisory_cves):
|
|
+ self._cves = advisory_cves
|
|
+
|
|
+ @property
|
|
+ def hotpatches(self):
|
|
+ return self._hotpatches
|
|
+
|
|
+ def add_hotpatch(self, hotpatch: Hotpatch):
|
|
+ self._hotpatches.append(hotpatch)
|
|
+
|
|
diff --git a/hotpatch/hotpatch.py b/hotpatch/hotpatch.py
|
|
new file mode 100644
|
|
index 0000000..a3ad7bb
|
|
--- /dev/null
|
|
+++ b/hotpatch/hotpatch.py
|
|
@@ -0,0 +1,164 @@
|
|
+import dnf
|
|
+from dnf.i18n import _
|
|
+from dnf.cli.commands.updateinfo import UpdateInfoCommand
|
|
+import hawkey
|
|
+from .hotpatch_updateinfo import HotpatchUpdateInfo
|
|
+
|
|
+@dnf.plugin.register_command
|
|
+class HotpatchCommand(dnf.cli.Command):
|
|
+ aliases = ['hotpatch']
|
|
+ summary = _('show hotpatch info')
|
|
+
|
|
+
|
|
+ def __init__(self, cli):
|
|
+ """
|
|
+ Initialize the command
|
|
+ """
|
|
+ super(HotpatchCommand, self).__init__(cli)
|
|
+
|
|
+ @staticmethod
|
|
+ def set_argparser(parser):
|
|
+ output_format = parser.add_mutually_exclusive_group()
|
|
+ output_format.add_argument("--list", dest='_spec_action', const='list',
|
|
+ action='store_const',
|
|
+ help=_('show list of cves'))
|
|
+
|
|
+ def configure(self):
|
|
+ demands = self.cli.demands
|
|
+ demands.sack_activation = True
|
|
+ demands.available_repos = True
|
|
+
|
|
+ self.filter_cves = self.opts.cves if self.opts.cves else None
|
|
+
|
|
+
|
|
+ def run(self):
|
|
+ self.hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli)
|
|
+
|
|
+ if self.opts._spec_action == 'list':
|
|
+ self.display()
|
|
+
|
|
+
|
|
+ def get_mapping_nevra_cve(self) -> dict:
|
|
+ """
|
|
+ Get cve nevra mapping based on the UpdateInfoCommand of 'dnf updateinfo list cves'
|
|
+
|
|
+ Returns:
|
|
+ {
|
|
+ (nevra, advisory.updated):
|
|
+ cve_id: {
|
|
+ (advisory.type, advisory.severity),
|
|
+ ...
|
|
+ }
|
|
+ ...
|
|
+ }
|
|
+ """
|
|
+ # configure UpdateInfoCommand with 'dnf updateinfo list cves'
|
|
+ updateinfo = UpdateInfoCommand(self.cli)
|
|
+ updateinfo.opts = self.opts
|
|
+
|
|
+ updateinfo.opts.spec_action = 'list'
|
|
+ updateinfo.opts.with_cve = True
|
|
+ updateinfo.opts.spec = '*'
|
|
+ updateinfo.opts._advisory_types = set()
|
|
+ updateinfo.opts.availability = 'available'
|
|
+ self.updateinfo = updateinfo
|
|
+
|
|
+ apkg_adv_insts = updateinfo.available_apkg_adv_insts(updateinfo.opts.spec)
|
|
+
|
|
+ mapping_nevra_cve = dict()
|
|
+ for apkg, advisory, _ in apkg_adv_insts:
|
|
+ nevra = (apkg.name, apkg.evr, apkg.arch)
|
|
+ for ref in advisory.references:
|
|
+ if ref.type != hawkey.REFERENCE_CVE:
|
|
+ continue
|
|
+ mapping_nevra_cve.setdefault((nevra, advisory.updated), dict())[ref.id] = (advisory.type, advisory.severity)
|
|
+
|
|
+ return mapping_nevra_cve
|
|
+
|
|
+
|
|
+ def _filter_and_format_list_output(self, echo_lines: list, fixed_cve_id: set, fixed_coldpatches: set):
|
|
+ """
|
|
+ Only show specified cve information that have not been fixed, and format output
|
|
+ """
|
|
+ def is_patch_fixed(coldpatch, fixed_coldpatches):
|
|
+ """
|
|
+ Check whether the coldpatch is fixed
|
|
+ """
|
|
+ for fixed_coldpatch in fixed_coldpatches:
|
|
+ pkg_name, pkg_evr, _ = coldpatch
|
|
+ fixed_pkg_name, fixed_pkg_evr, _ = fixed_coldpatch
|
|
+ if pkg_name != fixed_pkg_name:
|
|
+ continue
|
|
+ version = Versions()
|
|
+ if version.lgt(fixed_pkg_evr, pkg_evr):
|
|
+ return True
|
|
+ return False
|
|
+
|
|
+
|
|
+ idw = tiw = ciw = 0
|
|
+ format_lines = set()
|
|
+ for echo_line in echo_lines:
|
|
+ cve_id, type, coldpatch, hotpatch = echo_line[0], echo_line[1], echo_line[2], echo_line[3]
|
|
+ if self.filter_cves is not None and cve_id not in self.filter_cves:
|
|
+ continue
|
|
+ if cve_id in fixed_cve_id:
|
|
+ continue
|
|
+ if not isinstance(coldpatch, str):
|
|
+ if is_patch_fixed(coldpatch, fixed_coldpatches):
|
|
+ continue
|
|
+ else:
|
|
+ pkg_name, pkg_evr, pkg_arch = coldpatch
|
|
+ coldpatch = '%s-%s.%s' % (pkg_name, pkg_evr, pkg_arch)
|
|
+
|
|
+ idw = max(idw, len(cve_id))
|
|
+ tiw = max(tiw, len(type))
|
|
+ ciw = max(ciw, len(coldpatch))
|
|
+ format_lines.add((cve_id, type, coldpatch, hotpatch))
|
|
+ for format_line in sorted(format_lines, key = lambda x: x[2]):
|
|
+ print('%-*s %-*s %-*s %s' % (idw, format_line[0], tiw, format_line[1], ciw, format_line[2], format_line[3]))
|
|
+
|
|
+
|
|
+ def display(self):
|
|
+ """
|
|
+ Append hotpatch information according to the output of 'dnf updateinfo list cves'
|
|
+
|
|
+ echo lines:
|
|
+ [
|
|
+ [cve_id, type, coldpatch, hotpatch]
|
|
+ ]
|
|
+ """
|
|
+
|
|
+ def type2label(updateinfo, typ, sev):
|
|
+ if typ == hawkey.ADVISORY_SECURITY:
|
|
+ return updateinfo.SECURITY2LABEL.get(sev, _('Unknown/Sec.'))
|
|
+ else:
|
|
+ return updateinfo.TYPE2LABEL.get(typ, _('unknown'))
|
|
+
|
|
+
|
|
+ mapping_nevra_cve = self.get_mapping_nevra_cve()
|
|
+ echo_lines = []
|
|
+ fixed_cve_id = set()
|
|
+ fixed_coldpatches = set()
|
|
+ iterated_cve_id = set()
|
|
+ for ((nevra), aupdated), id2type in sorted(mapping_nevra_cve.items(), key=lambda x: x[0]):
|
|
+ pkg_name, pkg_evr, pkg_arch = nevra
|
|
+ for cve_id, atypesev in id2type.items():
|
|
+ iterated_cve_id.add(cve_id)
|
|
+ label = type2label(self.updateinfo, *atypesev)
|
|
+ echo_line = [cve_id, label, nevra, '-']
|
|
+ if cve_id in self.hp_hawkey.hotpatch_cves:
|
|
+ hotpatch = self.hp_hawkey.hotpatch_cves[cve_id].hotpatch
|
|
+ if hotpatch is not None and hotpatch.src_pkg_nevre[0] == pkg_name:
|
|
+ if hotpatch.state == self.hp_hawkey.INSTALLED :
|
|
+ # record the fixed cves
|
|
+ for cve_id in hotpatch.cves:
|
|
+ fixed_cve_id.add(cve_id)
|
|
+ # record the fixed coldpatch to filter the cves of the corresponding coldpatch with the lower version
|
|
+ fixed_coldpatches.add((nevra))
|
|
+ continue
|
|
+ elif hotpatch.state == self.hp_hawkey.INSTALLABLE:
|
|
+ echo_line[3] = hotpatch.nevra
|
|
+
|
|
+ echo_lines.append(echo_line)
|
|
+
|
|
+ self._filter_and_format_list_output(echo_lines, fixed_cve_id, fixed_coldpatches)
|
|
diff --git a/hotpatch/hotpatch_updateinfo.py b/hotpatch/hotpatch_updateinfo.py
|
|
new file mode 100644
|
|
index 0000000..bf04948
|
|
--- /dev/null
|
|
+++ b/hotpatch/hotpatch_updateinfo.py
|
|
@@ -0,0 +1,322 @@
|
|
+from .baseclass import Hotpatch, Cve, Advisory
|
|
+from .syscare import Syscare
|
|
+import os
|
|
+from typing import Optional
|
|
+import gzip
|
|
+import xml.etree.ElementTree as ET
|
|
+import datetime
|
|
+
|
|
+class HotpatchUpdateInfo(object):
|
|
+ """
|
|
+ Hotpatch relevant updateinfo processing
|
|
+ """
|
|
+
|
|
+ UNINSTALLABLE = 0
|
|
+ INSTALLED = 1
|
|
+ INSTALLABLE = 2
|
|
+
|
|
+ def __init__(self, base, cli):
|
|
+ self.base = base
|
|
+ self.cli = cli
|
|
+ # dict {advisory_id: Advisory}
|
|
+ self._hotpatch_advisories = {}
|
|
+ # dict {cve_id: Cve}
|
|
+ self._hotpatch_cves = {}
|
|
+ # list [{'Uuid': uuid, 'Name':name, 'Status': status}]
|
|
+ self._hotpatch_status = []
|
|
+
|
|
+ self.init_hotpatch_info()
|
|
+
|
|
+ def init_hotpatch_info(self):
|
|
+ """
|
|
+ Initialize hotpatch information
|
|
+ """
|
|
+ self._get_installed_pkgs()
|
|
+ self._parse_and_store_hotpatch_info_from_updateinfo()
|
|
+ self._init_hotpatch_status_from_syscare()
|
|
+ self._init_hotpatch_state()
|
|
+
|
|
+ @property
|
|
+ def hotpatch_cves(self):
|
|
+ return self._hotpatch_cves
|
|
+
|
|
+ @property
|
|
+ def hotpatch_status(self):
|
|
+ return self._hotpatch_status
|
|
+
|
|
+ def _get_installed_pkgs(self):
|
|
+ """
|
|
+ Get installed packages by setting the hawkey
|
|
+ """
|
|
+ sack = self.base.sack
|
|
+ # the latest installed packages
|
|
+ q = sack.query().installed().latest(1)
|
|
+ # plus packages of the running kernel
|
|
+ kernel_q = sack.query().filterm(empty=True)
|
|
+ kernel = sack.get_running_kernel()
|
|
+ if kernel:
|
|
+ kernel_q = kernel_q.union(
|
|
+ sack.query().filterm(sourcerpm=kernel.sourcerpm))
|
|
+ q = q.union(kernel_q.installed())
|
|
+ q = q.apply()
|
|
+
|
|
+ self._inst_pkgs_query = q
|
|
+
|
|
+ def _parse_and_store_hotpatch_info_from_updateinfo(self):
|
|
+ """
|
|
+ Initialize hotpatch information from repos
|
|
+ """
|
|
+ # get xxx-hotpatch.xml.gz file paths by traversing the system_cachedir(/var/cache/dnf)
|
|
+ system_cachedir = self.cli.base.conf.system_cachedir
|
|
+ all_repos = self.cli.base.repos
|
|
+ map_repo_updateinfoxml = {}
|
|
+
|
|
+ for file in os.listdir(system_cachedir):
|
|
+ file_path = os.path.join(system_cachedir, file)
|
|
+ if os.path.isdir(file_path):
|
|
+ repodata_path = os.path.join(file_path, "repodata")
|
|
+ if not os.path.isdir(repodata_path):
|
|
+ continue
|
|
+
|
|
+ for xml_file in os.listdir(repodata_path):
|
|
+ # the hotpatch relevant updateinfo is recorded in xxx-hotpatch.xml.gz
|
|
+ if "hotpatch" in xml_file:
|
|
+ repo_name = file.split("-")[0]
|
|
+ cache_updateinfo_xml_path = os.path.join(
|
|
+ repodata_path, xml_file)
|
|
+ map_repo_updateinfoxml[repo_name] = cache_updateinfo_xml_path
|
|
+
|
|
+ # only hotpatch relevant updateinfo from enabled repos are parsed and stored
|
|
+ for repo in all_repos.iter_enabled():
|
|
+ repo_id = repo.id
|
|
+ if repo_id in map_repo_updateinfoxml:
|
|
+ updateinfoxml_path = map_repo_updateinfoxml[repo_id]
|
|
+ self._parse_and_store_from_xml(updateinfoxml_path)
|
|
+
|
|
+ def _parse_pkglist(self, pkglist):
|
|
+ """
|
|
+ Parse the pkglist information, filter the hotpatches with different arches
|
|
+ """
|
|
+ hotpatches = []
|
|
+ hot_patch_collection = pkglist.find('collection')
|
|
+ arches = self.base.sack.list_arches()
|
|
+ if not hot_patch_collection:
|
|
+ return hotpatches
|
|
+ for package in hot_patch_collection.iter('package'):
|
|
+ hotpatch = {key: value for key, value in package.items()}
|
|
+ if hotpatch['arch'] not in arches:
|
|
+ continue
|
|
+ hotpatch['filename'] = package.find('filename').text
|
|
+ hotpatches.append(hotpatch)
|
|
+ return hotpatches
|
|
+
|
|
+ def _parse_references(self, reference):
|
|
+ """
|
|
+ Parse the reference information, check whether the 'id' is missing
|
|
+ """
|
|
+ cves = []
|
|
+ for ref in reference:
|
|
+ cve = {key: value for key, value in ref.items()}
|
|
+ if 'id' not in cve:
|
|
+ continue
|
|
+ cves.append(cve)
|
|
+ return cves
|
|
+
|
|
+ def _verify_date_str_lawyer(self, datetime_str: str) -> str:
|
|
+ """
|
|
+ Check whether the 'datetime' field is legal, if not return default value
|
|
+ """
|
|
+ if datetime_str.isdigit() and len(datetime_str) == 10:
|
|
+ datetime_str = int(datetime_str)
|
|
+ datetime_str = datetime.datetime.fromtimestamp(
|
|
+ datetime_str).strftime("%Y-%m-%d %H:%M:%S")
|
|
+ try:
|
|
+ datetime.datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
|
|
+ return datetime_str
|
|
+ except ValueError:
|
|
+ return "1970-01-01 08:00:00"
|
|
+
|
|
+ def _parse_advisory(self, update):
|
|
+ """
|
|
+ Parse the advisory information: check whether the 'datetime' field is legal, parse the 'references'
|
|
+ field and the 'pkglist' field, save 'type' information
|
|
+ """
|
|
+ advisory = {}
|
|
+ for node in update:
|
|
+ if node.tag == 'datetime':
|
|
+ advisory[node.tag] = self._verify_date_str_lawyer(
|
|
+ update.find(node.tag).text)
|
|
+ elif node.tag == 'references':
|
|
+ advisory[node.tag] = self._parse_references(node)
|
|
+ elif node.tag == 'pkglist':
|
|
+ advisory['hotpatches'] = self._parse_pkglist(node)
|
|
+ else:
|
|
+ advisory[node.tag] = update.find(node.tag).text
|
|
+ advisory['type'] = update.get('type')
|
|
+ return advisory
|
|
+
|
|
+ def _store_advisory_info(self, advisory_kwargs: dict()):
|
|
+ """
|
|
+ Instantiate Cve, Hotpatch and Advisory object according to the advisory kwargs
|
|
+ """
|
|
+ advisory_references = advisory_kwargs.pop('references')
|
|
+ advisory_hotpatches = advisory_kwargs.pop('hotpatches')
|
|
+ advisory = Advisory(**advisory_kwargs)
|
|
+ advisory_cves = {}
|
|
+ for cve_kwargs in advisory_references:
|
|
+ cve = Cve(**cve_kwargs)
|
|
+ self._hotpatch_cves[cve.cve_id] = cve
|
|
+ advisory_cves[cve.cve_id] = cve
|
|
+ advisory.cves = advisory_cves
|
|
+
|
|
+ for hotpatch_kwargs in advisory_hotpatches:
|
|
+ hotpatch = Hotpatch(**hotpatch_kwargs)
|
|
+ hotpatch.advisory = advisory
|
|
+ hotpatch.cves = advisory_cves.keys()
|
|
+
|
|
+ advisory.add_hotpatch(hotpatch)
|
|
+
|
|
+ for cve in advisory_cves.values():
|
|
+ cve.hotpatch = hotpatch
|
|
+
|
|
+ self._hotpatch_advisories.setdefault(
|
|
+ advisory_kwargs['id'], list()).append(advisory)
|
|
+
|
|
+ def _init_hotpatch_state(self):
|
|
+ """
|
|
+ Initialize the hotpatch state
|
|
+
|
|
+ each hotpatch has three states:
|
|
+ 1. UNINSTALLABLE: can not be installed due to the source package version mismatch
|
|
+ 2. INSTALLED: has been installed and actived in syscare
|
|
+ 3. INSTALLABLE: can be installed
|
|
+
|
|
+ """
|
|
+ for advisories in self._hotpatch_advisories.values():
|
|
+ for advisory in advisories:
|
|
+ for hotpatch in advisory.hotpatches:
|
|
+ src_pkg_name, src_pkg_version, src_pkg_release = hotpatch.src_pkg_nevre
|
|
+ inst_pkgs = self._inst_pkgs_query.filter(name=src_pkg_name)
|
|
+ hotpatch.state = self.UNINSTALLABLE
|
|
+ # check whether the relevant source package is installed on this machine
|
|
+ if not inst_pkgs:
|
|
+ continue
|
|
+ for inst_pkg in inst_pkgs:
|
|
+ inst_pkg_vere = '%s-%s' % (inst_pkg.version,
|
|
+ inst_pkg.release)
|
|
+ hp_vere = '%s-%s' % (src_pkg_version, src_pkg_release)
|
|
+ if hp_vere != inst_pkg_vere:
|
|
+ continue
|
|
+ elif self._get_hotpatch_status_in_syscare(hotpatch) == 'ACTIVED':
|
|
+ hotpatch.state = self.INSTALLED
|
|
+ else:
|
|
+ hotpatch.state = self.INSTALLABLE
|
|
+
|
|
+ def _parse_and_store_from_xml(self, updateinfoxml):
|
|
+ """
|
|
+ Parse and store hotpatch update information from xxx-hotpatch.xml.gz
|
|
+
|
|
+ xxx-hotpatch.xml.gz e.g.
|
|
+
|
|
+ <?xml version="1.0" encoding="UTF-8"?>
|
|
+ <updates>
|
|
+ <update from="openeuler.org" type="security" status="stable">
|
|
+ <id>openEuler-SA-2022-1</id>
|
|
+ <title>An update for mariadb is now available for openEuler-22.03-LTS</title>
|
|
+ <severity>Important</severity>
|
|
+ <release>openEuler</release>
|
|
+ <issued date="2022-04-16"></issued>
|
|
+ <references>
|
|
+ <reference href="https://nvd.nist.gov/vuln/detail/CVE-2021-46658" id="CVE-2021-1" title="CVE-2021-1" type="cve"></reference>
|
|
+ </references>
|
|
+ <description>patch-redis-6.2.5-1-HP001.(CVE-2022-24048)</description>
|
|
+ <pkglist>
|
|
+ <collection>
|
|
+ <name>openEuler</name>
|
|
+ <package arch="aarch64" name="patch-redis-6.2.5-1-HP001" release="0" version="1">
|
|
+ <filename>patch-redis-6.2.5-1-HP001-0-1.aarch64.rpm</filename>
|
|
+ </package>
|
|
+ <package arch="x86_64" name="patch-redis-6.2.5-1-HP001" release="0" version="1">
|
|
+ <filename>patch-redis-6.2.5-1-HP001-0-1.x86_64.rpm</filename>
|
|
+ </package>
|
|
+ <collection>
|
|
+ </pkglist>
|
|
+ </update>
|
|
+ ...
|
|
+ </updates>
|
|
+ """
|
|
+ content = gzip.open(updateinfoxml)
|
|
+ tree = ET.parse(content)
|
|
+ root = tree.getroot()
|
|
+ for update in root.iter('update'):
|
|
+ advisory = self._parse_advisory(update)
|
|
+ self._store_advisory_info(advisory)
|
|
+
|
|
+ def _init_hotpatch_status_from_syscare(self):
|
|
+ """
|
|
+ Initialize hotpatch status from syscare
|
|
+ """
|
|
+ self._hotpatch_status = Syscare().list()
|
|
+
|
|
+ self._hotpatch_state = {}
|
|
+ for hotpatch_info in self._hotpatch_status:
|
|
+ self._hotpatch_state[hotpatch_info['Name']
|
|
+ ] = hotpatch_info['Status']
|
|
+
|
|
+ def _get_hotpatch_status_in_syscare(self, hotpatch: Hotpatch) -> str:
|
|
+ """
|
|
+ Get hotpatch status in syscare
|
|
+ """
|
|
+ if hotpatch.syscare_name not in self._hotpatch_state:
|
|
+ return ''
|
|
+ return self._hotpatch_state[hotpatch.syscare_name]
|
|
+
|
|
+ def get_hotpatches_from_cve(self, cves: Optional[list[str]] = []) -> dict():
|
|
+ """
|
|
+ Get hotpatches from specified cve
|
|
+
|
|
+ Args:
|
|
+ cves: [cve_id_1, cve_id_2]
|
|
+
|
|
+ Returns:
|
|
+ {
|
|
+ cve_id_1: [hotpatch1],
|
|
+ cve_id_2: []
|
|
+ }
|
|
+ """
|
|
+ mapping_cve_hotpatches = dict()
|
|
+ for cve_id in cves:
|
|
+ mapping_cve_hotpatches[cve_id] = []
|
|
+ if cve_id not in self.hotpatch_cves:
|
|
+ continue
|
|
+ hotpatch = self.hotpatch_cves[cve_id].hotpatch
|
|
+ if hotpatch is not None and hotpatch.state == self.INSTALLABLE:
|
|
+ mapping_cve_hotpatches[cve_id].append(hotpatch.nevra)
|
|
+ return mapping_cve_hotpatches
|
|
+
|
|
+ def get_hotpatches_from_advisories(self, advisories: Optional[list[str]] = []) -> dict():
|
|
+ """
|
|
+ Get hotpatches from specified advisories
|
|
+
|
|
+ Args:
|
|
+ advisories: [advisory_id_1, advisory_id_2]
|
|
+
|
|
+ Return:
|
|
+ {
|
|
+ advisory_id_1: [hotpatch1],
|
|
+ advisory_id_2: []
|
|
+ }
|
|
+ """
|
|
+ mapping_advisory_hotpatches = dict()
|
|
+ for advisory_id in advisories:
|
|
+ mapping_advisory_hotpatches[advisory_id] = []
|
|
+ if advisory_id not in self._hotpatch_advisories:
|
|
+ continue
|
|
+ for advisory in self._hotpatch_advisories[advisory_id]:
|
|
+ for hotpatch in advisory.hotpatches:
|
|
+ if hotpatch.state == self.INSTALLABLE:
|
|
+ mapping_advisory_hotpatches[advisory_id].append(
|
|
+ hotpatch.nevra)
|
|
+ return mapping_advisory_hotpatches
|
|
+
|
|
+
|
|
--
|
|
2.33.0
|
|
|