From d7d97d24c0b2b7e0f5686fc9f9bd1674c94c53eb Mon Sep 17 00:00:00 2001 From: rabbitali Date: Mon, 4 Sep 2023 17:06:06 +0800 Subject: [PATCH] adapted to cve rollback and update return value MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ceres/conf/constant.py | 5 ++ ceres/function/status.py | 2 + ceres/manages/vulnerability_manage.py | 114 ++++++++++++++------------ 3 files changed, 68 insertions(+), 53 deletions(-) diff --git a/ceres/conf/constant.py b/ceres/conf/constant.py index 8c7723b..79594ae 100644 --- a/ceres/conf/constant.py +++ b/ceres/conf/constant.py @@ -70,3 +70,8 @@ REGISTER_HELP_INFO = """ class CommandExitCode: SUCCEED = 0 FAIL = 255 + + +class TaskExecuteRes: + SUCCEED = "succeed" + FAIL = "fail" diff --git a/ceres/function/status.py b/ceres/function/status.py index 70fc212..d1fe088 100644 --- a/ceres/function/status.py +++ b/ceres/function/status.py @@ -25,6 +25,7 @@ REPO_CONTENT_INCORRECT = "Repo.Content.Incorrect" REPO_NOT_SET = "Repo.Not.Set" NO_COMMAND = "No.Command" NOT_PATCH = "Not.Patch" +PRE_CHECK_ERROR = "Pre.Check.Error" COMMAND_EXEC_ERROR = "Command.Error" @@ -49,6 +50,7 @@ class StatusCode: NO_COMMAND: {"msg": "command not found"}, NOT_PATCH: {"msg": "no valid hot patch is matched"}, COMMAND_EXEC_ERROR: {"msg": "the input command is incorrect"}, + PRE_CHECK_ERROR: {"msg": "Preset check item detection failed"}, } @classmethod diff --git a/ceres/manages/vulnerability_manage.py b/ceres/manages/vulnerability_manage.py index 747df61..20bfe1e 100644 --- a/ceres/manages/vulnerability_manage.py +++ b/ceres/manages/vulnerability_manage.py @@ -16,19 +16,10 @@ import json from collections import defaultdict from typing import Dict, List, Tuple -from ceres.conf.constant import REPO_ID_FOR_CVE_MANAGE, CommandExitCode +from ceres.conf.constant import REPO_ID_FOR_CVE_MANAGE, CommandExitCode, TaskExecuteRes from ceres.function.check import PreCheck from ceres.function.log import LOGGER -from ceres.function.status import ( - FAIL, - NOT_PATCH, - PARAM_ERROR, - REPO_CONTENT_INCORRECT, - REPO_NOT_SET, - SUCCESS, - StatusCode, - COMMAND_EXEC_ERROR, -) +from ceres.function.status import * from ceres.function.util import execute_shell_command @@ -131,7 +122,7 @@ class VulnerabilityManage: cve_scan_result["check_items"] = items_check_log if not check_result: LOGGER.info("The pre-check is failed before execute command!") - return FAIL, cve_scan_result + return PRE_CHECK_ERROR, cve_scan_result self.installed_rpm_info = self._query_installed_rpm() self.available_rpm_info = self._query_available_rpm() @@ -200,7 +191,7 @@ class VulnerabilityManage: # kernel-debuginfo.x86_64 5.10.0-60.105.0.132.oe2203 update # kernel-debugsource.x86_64 5.10.0-60.105.0.132.oe2203 update # kernel-devel.x86_64 5.10.0-60.105.0.132.oe2203 update - code, stdout, stderr = execute_shell_command("dnf list available") + code, stdout, stderr = execute_shell_command("dnf list available|grep -v '.src'") if code != CommandExitCode.SUCCEED: LOGGER.error(stderr) return result @@ -435,7 +426,7 @@ class VulnerabilityManage: if not applied_hotpatch_info_list: return result - record_key_set = {} + record_key_set = set() for cve_id, patch_name, hotpatch_status in applied_hotpatch_info_list: rpm = patch_name.split("-", 1)[0] # Refer to this example, the CVE can be marked as fixed only if all hotpatch are applied. @@ -513,10 +504,10 @@ class VulnerabilityManage: def gen_failed_result(cves: dict, log: str): result = [] for cve in cves: - cve_fix_result = {"cve_id": cve.get("cve_id"), "result": "failed", "rpms": []} + cve_fix_result = {"cve_id": cve.get("cve_id"), "result": TaskExecuteRes.FAIL, "rpms": []} for rpm in cve.get("rpms"): cve_fix_result["rpms"].append( - {"installed_rpm": rpm.get("installed_rpm"), "result": "fail", "log": log} + {"installed_rpm": rpm.get("installed_rpm"), "result": TaskExecuteRes.FAIL, "log": log} ) result.append(cve_fix_result) return result @@ -528,14 +519,14 @@ class VulnerabilityManage: if not check_result: LOGGER.info("The pre-check is failed before execute command!") result["cves"] = gen_failed_result(unfixed_cve_info.get("cves"), "pre-check items check failed") - return FAIL, result + return PRE_CHECK_ERROR, result all_cve_fix_result, all_cve_fix_info = [], [] for cve_info in unfixed_cve_info.get("cves"): rpms_fix_result, rpm_fix_result_list = [], [] for rpm_info in cve_info.get("rpms"): update_result, log = self._update_rpm_by_dnf(rpm_info) - rpms_fix_result.append(update_result == SUCCESS) + rpms_fix_result.append(update_result == TaskExecuteRes.SUCCEED) rpm_fix_result_list.append( {"installed_rpm": rpm_info.get("installed_rpm"), "result": update_result, "log": log} ) @@ -544,13 +535,13 @@ class VulnerabilityManage: all_cve_fix_info.append( { "cve_id": cve_info.get("cve_id"), - "result": SUCCESS if all(rpms_fix_result) else FAIL, + "result": TaskExecuteRes.SUCCEED if all(rpms_fix_result) else TaskExecuteRes.FAIL, "rpms": rpm_fix_result_list, } ) result.update({"cves": all_cve_fix_info}) - return SUCCESS if all(all_cve_fix_result) else FAIL, result + return TaskExecuteRes.SUCCEED if all(all_cve_fix_result) else TaskExecuteRes.FAIL, result def _update_rpm_by_dnf(self, cve: dict) -> Tuple[str, str]: """ @@ -587,10 +578,10 @@ class VulnerabilityManage: """ code, stdout, stderr = execute_shell_command(f"dnf update {rpm_name} -y") if code != CommandExitCode.SUCCEED: - return FAIL, stderr + return TaskExecuteRes.FAIL, stderr if "Complete" not in stdout: - return FAIL, stdout - return SUCCESS, stdout + return TaskExecuteRes.FAIL, stdout + return TaskExecuteRes.SUCCEED, stdout def _update_hotpatch_by_dnf_plugin(self, hotpatch_pkg: str) -> Tuple[str, str]: """ @@ -611,10 +602,10 @@ class VulnerabilityManage: code, stdout, stderr = execute_shell_command(update_command) if code != CommandExitCode.SUCCEED: - return FAIL, stderr + return TaskExecuteRes.FAIL, stderr if "Apply hot patch succeed" not in stdout and "No hot patches marked for install" not in stdout: - return FAIL, stdout + return TaskExecuteRes.FAIL, stdout if not self.takeover and self.accepted: try: @@ -626,7 +617,7 @@ class VulnerabilityManage: LOGGER.error(error) stdout += "\n" + "hotpatch status set failed due to can't get correct hotpatch name!" - return SUCCESS, stdout + return TaskExecuteRes.SUCCEED, stdout @staticmethod def _set_hotpatch_status_by_dnf_plugin(hotpatch: str, operation: str) -> Tuple[bool, str]: @@ -703,25 +694,47 @@ class VulnerabilityManage: log = "No valid hot patch is matched." return NOT_PATCH, [dict(cve_id=cve["cve_id"], log=log, result="fail") for cve in cves] - cmd_execute_result = [] - for base_pkg, hotpatch_cves in hotpatch_list.items(): - rollback_result, log = self._hotpatch_rollback(base_pkg) - result = 'succeed' if rollback_result else 'fail' - cmd_execute_result.extend([dict(cve_id=cve, log=log, result=result) for cve in hotpatch_cves]) + wait_to_rollback_patch = [] + for cve_info in cves: + wait_to_rollback_patch.extend(hotpatch_list.get(cve_info["cve_id"], [])) - not_rollback_cve = set([cve["cve_id"] for cve in cves]).difference( - set([cve_info["cve_id"] for cve_info in cmd_execute_result]) - ) + hotpatch_rollback_res = {} + for patch in set(wait_to_rollback_patch): + rollback_result, log = self._hotpatch_rollback(patch) + hotpatch_rollback_res[patch] = { + "result": TaskExecuteRes.SUCCEED if rollback_result else TaskExecuteRes.FAIL, + "log": log, + } + + cve_rollback_result = [] + + for cve_info in cves: + if cve_info["cve_id"] not in hotpatch_list: + fail_result = { + "cve_id": cve_info["cve_id"], + "log": "No valid hot patch is matched." + if cve_info["hotpatch"] + else "Cold patch rollback is not supported.", + "result": "fail", + } + cve_rollback_result.append(fail_result) + else: + tmp_result_list = [] + tmp_log = [] - if not_rollback_cve: - for cve in [cve for cve in cves if cve["cve_id"] in not_rollback_cve]: - if cve["hotpatch"]: - log = "No valid hot patch is matched." - else: - log = "Cold patch rollback is not supported." - cmd_execute_result.append(dict(cve_id=cve["cve_id"], log=log, result="fail")) + for patch in hotpatch_list.get(cve_info["cve_id"]): + tmp_result_list.append(hotpatch_rollback_res[patch]["result"] == TaskExecuteRes.SUCCEED) + tmp_log.append(hotpatch_rollback_res[patch]["log"]) - return SUCCESS, cmd_execute_result + cve_rollback_result.append( + { + "cve_id": cve_info["cve_id"], + "log": "\n".join(tmp_log), + "result": TaskExecuteRes.SUCCEED if all(tmp_result_list) else TaskExecuteRes.FAIL, + } + ) + + return SUCCESS, cve_rollback_result @staticmethod def _hotpatch_list_cve() -> dict: @@ -731,15 +744,14 @@ class VulnerabilityManage: Returns: dict: e.g { - "base-pkg/hotpatch-1": [cve_id1,cveid2], - "base-pkg/hotpatch-2": [cve_id1,cveid2] + "CVE-XXXX-XXX": ["patch 1", "patch 2"] } """ # Run the dnf command to query the hotpatch list,e.g # Last metadata expiration check: - # CVE id base-pkg/hotpatch status - # CVE-1 A-1.1-1/HP1 ACTIVED - # CVE-2 A-1.1-1/HP1 ACTIVED + # CVE id base-pkg/hotpatch status + # CVE-1 A-1.1-1/ACC-1-1/binary_file1 ACTIVED + # CVE-2 A-1.1-1/ACC-1-1/binary_file2 ACTIVED code, hotpatch_list_output, _ = execute_shell_command(f"dnf hotpatch --list cve") if code != CommandExitCode.SUCCEED: LOGGER.error(f"Failed to hotpatch list cve.") @@ -754,7 +766,7 @@ class VulnerabilityManage: cve_id, base_pkg, status = [info.strip() for info in hotpatch_info.split()] if status != "ACTIVED" and status != "ACCEPTED": continue - hotpatch_list[base_pkg].append(cve_id) + hotpatch_list[cve_id].append(base_pkg) return hotpatch_list @@ -765,11 +777,7 @@ class VulnerabilityManage: Args: cve_id: cve is rolled back """ - execute_result, hotpatch_release_info = self._hotpatch_info(base_pkg_hotpatch) - if not execute_result: - return False, "Failed to query patch information." - - hotpatch_name = "patch-%s-%s-%s-%s" % tuple(base_pkg_hotpatch.split("/") + list(hotpatch_release_info.values())) + hotpatch_name = "patch-%s-%s" % tuple(base_pkg_hotpatch.rsplit("/", 2)[:2]) _, stdout, stderr = execute_shell_command(f"dnf remove {hotpatch_name} -y") return True, stdout + stderr -- Gitee