Binary files oec-hardware/docs/test-flow.png and oec-hardware_new/docs/test-flow.png differ diff -urN oec-hardware/hwcompatible/client.py oec-hardware_new/hwcompatible/client.py --- oec-hardware/hwcompatible/client.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/client.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,15 +12,16 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""upload file""" + import os import base64 try: from urllib.parse import urlencode from urllib.request import urlopen, Request - from urllib.error import HTTPError except ImportError: from urllib import urlencode - from urllib2 import urlopen, Request, HTTPError + from urllib2 import urlopen, Request class Client: @@ -29,7 +30,7 @@ """ def __init__(self, host, oec_id): self.host = host - self.id = oec_id + self.oec_id = oec_id self.form = {} def upload(self, files, server='localhost'): @@ -42,17 +43,17 @@ filename = os.path.basename(files) try: job = filename.split('.')[0] - with open(files, 'rb') as f: - filetext = base64.b64encode(f.read()) - except Exception as e: - print(e) + with open(files, 'rb') as file: + filetext = base64.b64encode(file.read()) + except Exception as excp: + print(excp) return False - if not self.host or not self.id: - print("Missing host({0}) or id({1})".format(self.host, self.id)) + if not self.host or not self.oec_id: + print("Missing host({0}) or id({1})".format(self.host, self.oec_id)) return False self.form['host'] = self.host - self.form['id'] = self.id + self.form['id'] = self.oec_id self.form['job'] = job self.form['filetext'] = filetext @@ -70,8 +71,8 @@ print("Error: upload failed, %s" % res.msg) return False return True - except Exception as e: - print(e) + except Exception as excp: + print(excp) return False diff -urN oec-hardware/hwcompatible/command.py oec-hardware_new/hwcompatible/command.py --- oec-hardware/hwcompatible/command.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/command.py 2021-09-13 11:42:03.381474028 +0800 @@ -45,7 +45,7 @@ encoding='utf8') (output, errors) = self.pipe.communicate() if output: - #Strip new line character/s if any from the end of output string + # Strip new line character/s if any from the end of output string output = output.rstrip('\n') self.origin_output = output self.output = output.splitlines() @@ -54,6 +54,7 @@ self.returncode = self.pipe.returncode def start(self): + """start command""" if sys.version_info.major < 3: self.pipe = subprocess.Popen(self.command, shell=True, stdin=subprocess.PIPE, @@ -82,14 +83,15 @@ # raise CertCommandError(self, "has output on stderr") def run_quiet(self): + """quiet after running command""" self._run() if self.returncode != 0: raise CertCommandError(self, "returned %d" % self.returncode) def echo(self, ignore_errors=False): + """Print information to terminal""" self.run(ignore_errors) self.print_output() - return def print_output(self): """ @@ -141,49 +143,39 @@ return self.pipe.stdout.read().decode('utf-8', 'ignore').rstrip() def poll(self): + """get poll message""" if self.pipe: return self.pipe.poll() - def _get_str(self, regex=None, regex_group=None, single_line=True, return_list=False): - self.regex = regex - self.single_line = single_line - self.regex_group = regex_group - - self._run() - - if self.single_line: - if self.output and len(self.output) > 1: - raise CertCommandError(self, "Found %u lines of output, expected 1" % len(self.output)) + def _get_str_single_line(self): + if self.output and len(self.output) > 1: + raise CertCommandError(self, "Found %u lines of output, " + "expected 1" % len(self.output)) - if self.output: - line = self.output[0].strip() - if not self.regex: - return line - # otherwise, try the regex - pattern = re.compile(self.regex) - match = pattern.match(line) + if self.output: + line = self.output[0].strip() + if not self.regex: + return line + # otherwise, try the regex + pattern = re.compile(self.regex) + match = pattern.match(line) + if match: + if self.regex_group: + return match.group(self.regex_group) + # otherwise, no group, return the whole line + return line + + # no regex match try a grep-style match + if not self.regex_group: + match = pattern.search(line) if match: - if self.regex_group: - return match.group(self.regex_group) - # otherwise, no group, return the whole line - return line - - # no regex match try a grep-style match - if not self.regex_group: - match = pattern.search(line) - if match: - return match.group() + return match.group() - # otherwise - raise CertCommandError(self, "no match for regular expression %s" % self.regex) + # otherwise + raise CertCommandError(self, "no match for regular " + "expression %s" % self.regex) - #otherwise, multi-line or single-line regex - if not self.regex: - raise CertCommandError(self, "no regular expression set for multi-line command") - pattern = re.compile(self.regex) - result = None - if return_list: - result = list() + def _get_str_multi_line(self, result, pattern, return_list): if self.output: for line in self.output: if self.regex_group: @@ -202,12 +194,32 @@ result.append(match.group()) else: return match.group() - if result: - return result + return result + + def _get_str(self, regex=None, regex_group=None, + single_line=True, return_list=False): + self.regex = regex + self.single_line = single_line + self.regex_group = regex_group + + self._run() + + if self.single_line: + return self._get_str_single_line() - raise CertCommandError(self, "no match for regular expression %s" % self.regex) + # otherwise, multi-line or single-line regex + if not self.regex: + raise CertCommandError(self, "no regular expression " + "set for multi-line command") + pattern = re.compile(self.regex) + result = None + if return_list: + result = list() + return self._get_str_multi_line(result, pattern, return_list) - def get_str(self, regex=None, regex_group=None, single_line=True, return_list=False, ignore_errors=False): + def get_str(self, regex=None, regex_group=None, single_line=True, + return_list=False, ignore_errors=False): + """获取命令执行结果中匹配的值""" result = self._get_str(regex, regex_group, single_line, return_list) if not ignore_errors: if self.returncode != 0: @@ -226,7 +238,7 @@ Cert command error handling """ def __init__(self, command, message): - super(CertCommandError, self).__init__() + Exception.__init__(self) self.message = message self.command = command self.__message = None @@ -234,7 +246,9 @@ def __str__(self): return "\"%s\" %s" % (self.command.command, self.message) - def _get_message(self): return self.__message - def _set_message(self, value): self.__message = value - message = property(_get_message, _set_message) + def _get_message(self): + return self.__message + def _set_message(self, value): + self.__message = value + message = property(_get_message, _set_message) diff -urN oec-hardware/hwcompatible/commandUI.py oec-hardware_new/hwcompatible/commandUI.py --- oec-hardware/hwcompatible/commandUI.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/commandUI.py 2021-09-13 11:42:03.381474028 +0800 @@ -23,7 +23,7 @@ def __init__(self, echoResponses=False): self.echo = echoResponses - def printPipe(self, pipe): + def print_pipe(self, pipe): """ print pipe data :param pipe: @@ -123,6 +123,7 @@ reply = input(label).strip() if not choices or reply in choices: return reply - print("Please enter one of the following: %s" % " | ".join(choices)) + print("Please enter one of the " + "following: %s" % " | ".join(choices)) finally: readline.set_startup_hook() diff -urN oec-hardware/hwcompatible/compatibility.py oec-hardware_new/hwcompatible/compatibility.py --- oec-hardware/hwcompatible/compatibility.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/compatibility.py 2021-09-13 11:42:03.381474028 +0800 @@ -60,7 +60,8 @@ self.devices = DeviceDocument(CertEnv.devicefile, oec_devices) self.devices.save() - # test_factory format example: [{"name":"nvme", "device":device, "run":True, "status":"PASS", "reboot":False}] + # test_factory format example: [{"name":"nvme", "device":device, + # "run":True, "status":"PASS", "reboot":False}] test_factory = self.get_tests(oec_devices) self.update_factory(test_factory) if not self.choose_tests(): @@ -95,7 +96,8 @@ clean all compatibility test file :return: """ - if self.ui.prompt_confirm("Are you sure to clean all compatibility test data?"): + if self.ui.prompt_confirm("Are you sure to clean all " + "compatibility test data?"): try: Command("rm -rf %s" % CertEnv.certificationfile).run() Command("rm -rf %s" % CertEnv.factoryfile).run() @@ -149,7 +151,8 @@ cwd = os.getcwd() os.chdir(os.path.dirname(doc_dir)) - dir_name = "oech-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S") + "-" + job.job_id + dir_name = "oech-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S")\ + + "-" + job.job_id pack_name = dir_name + ".tar" cmd = Command("tar -cf %s %s" % (pack_name, dir_name)) try: @@ -213,11 +216,8 @@ :param devices: :return: """ - nodevice = ["cpufreq", "memory", "clock", "profiler", "system", "stress", "kdump", "perf", "acpi", "watchdog"] - ethernet = ["ethernet"] - infiniband = ["infiniband"] - storage = ["nvme", "disk", "nvdimm"] - cdrom = ["cdrom"] + nodevice = ["cpufreq", "memory", "clock", "profiler", "system", + "stress", "kdump", "perf", "acpi", "watchdog"] sort_devices = self.sort_tests(devices) empty_device = Device() test_factory = list() @@ -225,7 +225,8 @@ for (dirpath, dirs, filenames) in os.walk(CertEnv.testdirectoy): dirs.sort() for filename in filenames: - if filename.endswith(".py") and not filename.startswith("__init__"): + if filename.endswith(".py") and \ + not filename.startswith("__init__"): casenames.append(filename.split(".")[0]) for testname in casenames: @@ -258,15 +259,18 @@ empty_device = Device() for device in devices: if device.get_property("SUBSYSTEM") == "usb" and \ - device.get_property("ID_VENDOR_FROM_DATABASE") == "Linux Foundation" and \ + device.get_property("ID_VENDOR_FROM_DATABASE") == \ + "Linux Foundation" and \ ("2." in device.get_property("ID_MODEL_FROM_DATABASE") or "3." in device.get_property("ID_MODEL_FROM_DATABASE")): sort_devices["usb"] = [empty_device] continue - if device.get_property("PCI_CLASS") == "30000" or device.get_property("PCI_CLASS") == "38000": + if device.get_property("PCI_CLASS") == "30000" or \ + device.get_property("PCI_CLASS") == "38000": sort_devices["video"] = [device] continue - if (device.get_property("DEVTYPE") == "disk" and not device.get_property("ID_TYPE")) or \ + if (device.get_property("DEVTYPE") == "disk" and + not device.get_property("ID_TYPE")) or \ device.get_property("ID_TYPE") == "disk": if "nvme" in device.get_property("DEVPATH"): sort_devices["disk"] = [empty_device] @@ -274,11 +278,10 @@ sort_devices["nvme"].extend([device]) except KeyError: sort_devices["nvme"] = [device] - continue elif "/host" in device.get_property("DEVPATH"): sort_devices["disk"] = [empty_device] - continue - if device.get_property("SUBSYSTEM") == "net" and device.get_property("INTERFACE"): + if device.get_property("SUBSYSTEM") == "net" and \ + device.get_property("INTERFACE"): interface = device.get_property("INTERFACE") nmcli = Command("nmcli device") nmcli.start() @@ -304,7 +307,7 @@ break continue if device.get_property("ID_CDROM") == "1": - types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD", \ + types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD", "BD_RE", "BD_R", "BD", "CD_RW", "CD_R", "CD"] for dev_type in types: if device.get_property("ID_CDROM_" + dev_type) == "1": @@ -316,7 +319,8 @@ if device.get_property("SUBSYSTEM") == "ipmi": sort_devices["ipmi"] = [empty_device] try: - Command("dmidecode").get_str("IPMI Device Information", single_line=False) + Command("dmidecode").get_str("IPMI Device Information", + single_line=False) sort_devices["ipmi"] = [empty_device] except: pass @@ -353,21 +357,24 @@ test["run"] = True continue - try: - num = int(reply) - except: - continue + num_lst = reply.split(" ") + for num in num_lst: + try: + num = int(num) + except ValueError: + continue - if num > 0 and num <= len(self.test_factory): - self.test_factory[num - 1]["run"] = not self.test_factory[num - 1]["run"] - continue + if 0 < num <= len(self.test_factory): + self.test_factory[num - 1]["run"] = not \ + self.test_factory[num - 1]["run"] + continue def show_tests(self): """ show test items :return: """ - print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) \ + print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) + "Status".ljust(8) + "Class".ljust(14) + "Device\033[0m") num = 0 for test in self.test_factory: @@ -385,16 +392,16 @@ num = num + 1 if status == "PASS": - print("%-6d" % num + run.ljust(8) + "\033[0;32mPASS \033[0m" \ + print("%-6d" % num + run.ljust(8) + "\033[0;32mPASS \033[0m" + name.ljust(14) + "%s" % device) elif status == "FAIL": - print("%-6d" % num + run.ljust(8) + "\033[0;31mFAIL \033[0m" \ + print("%-6d" % num + run.ljust(8) + "\033[0;31mFAIL \033[0m" + name.ljust(14) + "%s" % device) elif status == "Force": - print("%-6d" % num + run.ljust(8) + "\033[0;33mForce \033[0m" \ + print("%-6d" % num + run.ljust(8) + "\033[0;33mForce \033[0m" + name.ljust(14) + "%s" % device) else: - print("%-6d" % num + run.ljust(8) + "\033[0;34mNotRun \033[0m" \ + print("%-6d" % num + run.ljust(8) + "\033[0;34mNotRun \033[0m" + name.ljust(14) + "%s" % device) def choose_tests(self): @@ -408,19 +415,20 @@ else: test["run"] = True os.system("clear") - print("These tests are recommended to complete the compatibility test:") + print("These tests are recommended to " + "complete the compatibility test:") self.show_tests() - action = self.ui.prompt("Ready to begin testing?", ["run", "edit", "quit"]) + action = self.ui.prompt("Ready to begin testing?", + ["run", "edit", "quit"]) action = action.lower() if action in ["r", "run"]: return True - elif action in ["q", "quit"]: + if action in ["q", "quit"]: return False - elif action in ["e", "edit"]: + if action in ["e", "edit"]: return self.edit_tests() - else: - print("Invalid choice!") - return self.choose_tests() + print("Invalid choice!") + return self.choose_tests() def check_result(self): """ @@ -443,15 +451,16 @@ if not self.test_factory: self.test_factory = test_factory else: - factory_changed = False for test in self.test_factory: if not self.search_factory(test, test_factory): self.test_factory.remove(test) - print("delete %s test %s" % (test["name"], test["device"].get_name())) + print("delete %s test %s" % (test["name"], + test["device"].get_name())) for test in test_factory: if not self.search_factory(test, self.test_factory): self.test_factory.append(test) - print("add %s test %s" % (test["name"], test["device"].get_name())) + print("add %s test %s" % (test["name"], + test["device"].get_name())) self.test_factory.sort(key=lambda k: k["name"]) FactoryDocument(CertEnv.factoryfile, self.test_factory).save() @@ -463,6 +472,7 @@ :return: """ for test in test_factory: - if test["name"] == obj_test["name"] and test["device"].path == obj_test["device"].path: + if test["name"] == obj_test["name"] and \ + test["device"].path == obj_test["device"].path: return True return False diff -urN oec-hardware/hwcompatible/device.py oec-hardware_new/hwcompatible/device.py --- oec-hardware/hwcompatible/device.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/device.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,7 +12,7 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 -from .command import Command, CertCommandError +from .command import Command def filter_char(string): @@ -25,8 +25,8 @@ filtered = u'' start = 0 for i in range(len(string)): - c = string[i] - if c in ascii_blacklist or (type(string) != unicode and ord(c) >= 128): + char_filter = string[i] + if char_filter in ascii_blacklist or (type(string) != unicode and ord(char_filter) >= 128): if start < i: filtered += string[start:i] start = i + 1 @@ -110,10 +110,8 @@ """ if "INTERFACE" in self.properties.keys(): return self.properties["INTERFACE"] - elif "DEVNAME" in self.properties.keys(): + if "DEVNAME" in self.properties.keys(): return self.properties["DEVNAME"].split("/")[-1] - elif self.path: + if self.path: return self.path.split("/")[-1] - else: - return "" - + return "" diff -urN oec-hardware/hwcompatible/document.py oec-hardware_new/hwcompatible/document.py --- oec-hardware/hwcompatible/document.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/document.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,8 +12,9 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 -import json +"""Document processing""" +import json from .commandUI import CommandUI from .command import Command from .device import Device @@ -33,23 +34,25 @@ print("doc new") def save(self): + """save file""" try: with open(self.filename, "w+") as save_f: json.dump(self.document, save_f, indent=4) save_f.close() - except Exception as e: + except Exception as concrete_error: print("Error: doc save fail.") - print(e) + print(concrete_error) return False return True def load(self): + """load file""" try: with open(self.filename, "r") as load_f: self.document = json.load(load_f) load_f.close() return True - except: + except Exception: return False @@ -86,33 +89,53 @@ self.document[key] = value else: break - except Exception as e: + except Exception as concrete_error: print("Error: get hardware info fail.") - print(e) + print(concrete_error) sysinfo = SysInfo(CertEnv.releasefile) self.document["OS"] = sysinfo.product + " " + sysinfo.get_version() self.document["kernel"] = sysinfo.kernel self.document["ID"] = CommandUI().prompt("Please provide your Compatibility Test ID:") self.document["Product URL"] = CommandUI().prompt("Please provide your Product URL:") - self.document["server"] = CommandUI().prompt("Please provide the Compatibility Test Server (Hostname or Ipaddr):") + self.document["server"] = CommandUI().prompt("Please provide the Compatibility Test " + "Server (Hostname or Ipaddr):") def get_hardware(self): - return self.document["Manufacturer"] + " " + self.document["Product Name"] + " " + self.document["Version"] + """ + Get hardware information + """ + return self.document["Manufacturer"] + " " + self.document["Product Name"] + " " \ + + self.document["Version"] def get_os(self): + """ + Get os information + """ return self.document["OS"] def get_server(self): + """ + Get server information + """ return self.document["server"] def get_url(self): + """ + Get url + """ return self.document["Product URL"] def get_certify(self): + """ + Get certify + """ return self.document["ID"] def get_kernel(self): + """ + Get kernel information + """ return self.document["kernel"] @@ -179,17 +202,23 @@ self.load() def load(self): - fp = open(self.filename) - self.config = fp.readlines() + """ + Load config file + """ + fp_info = open(self.filename) + self.config = fp_info.readlines() for line in self.config: if line.strip() and line.strip()[0] == "#": continue words = line.strip().split(" ") if words[0]: self.parameters[words[0]] = " ".join(words[1:]) - fp.close() + fp_info.close() def get_parameter(self, name): + """ + Get parameter + """ if self.parameters: try: return self.parameters[name] @@ -198,6 +227,9 @@ return None def dump(self): + """ + Dump + """ for line in self.config: string = line.strip() if not string or string[0] == "#": @@ -205,6 +237,9 @@ print(string) def add_parameter(self, name, value): + """ + add parameter + """ if not self.getParameter(name): self.parameters[name] = value self.config.append("%s %s\n" % (name, value)) @@ -238,7 +273,7 @@ Save the config property value to a file :return: """ - fp = open(self.filename, "w") + fp_info = open(self.filename, "w") for line in self.config: - fp.write(line) - fp.close() + fp_info.write(line) + fp_info.close() diff -urN oec-hardware/hwcompatible/env.py oec-hardware_new/hwcompatible/env.py --- oec-hardware/hwcompatible/env.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/env.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""Certification file path""" + class CertEnv: """ @@ -28,5 +30,3 @@ logdirectoy = "/usr/share/oech/logs" resultdirectoy = "/usr/share/oech/lib/server/results" kernelinfo = "/usr/share/oech/kernelrelease.json" - - diff -urN oec-hardware/hwcompatible/job.py oec-hardware_new/hwcompatible/job.py --- oec-hardware/hwcompatible/job.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/job.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""Test task management""" + import os import sys import string @@ -26,7 +28,7 @@ from .reboot import Reboot -class Job(object): +class Job(): """ Test task management """ @@ -42,7 +44,7 @@ self.test_factory = getattr(args, "test_factory", []) self.test_suite = [] self.job_id = ''.join(random.sample(string.ascii_letters + string.digits, 10)) - self.ui = CommandUI() + self.com_ui = CommandUI() self.subtests_filter = getattr(args, "subtests_filter", None) self.test_parameters = None @@ -74,9 +76,9 @@ sys.path.insert(0, dirpath) try: module = __import__(testname, globals(), locals()) - except Exception as e: + except Exception as concrete_error: print("Error: module import failed for %s" % testname) - print(e) + print(concrete_error) return None for thing in dir(module): @@ -144,8 +146,8 @@ try: cmd = Command("yum install -y " + " ".join(required_rpms)) cmd.echo() - except CertCommandError as e: - print(e) + except CertCommandError as concrete_error: + print(concrete_error) print("Fail to install required packages.") return False @@ -182,8 +184,8 @@ return_code = test.test() else: return_code = test.test() - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return_code = False if reboot: @@ -251,5 +253,6 @@ """ for test in self.test_factory: for testcase in self.test_suite: - if test["name"] == testcase["name"] and test["device"].path == testcase["device"].path: + if test["name"] == testcase["name"] and test["device"].path == \ + testcase["device"].path: test["status"] = testcase["status"] diff -urN oec-hardware/hwcompatible/log.py oec-hardware_new/hwcompatible/log.py --- oec-hardware/hwcompatible/log.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/log.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""Log management""" + import os import sys import datetime diff -urN oec-hardware/hwcompatible/reboot.py oec-hardware_new/hwcompatible/reboot.py --- oec-hardware/hwcompatible/reboot.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/reboot.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,7 +12,10 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""Special for restart tasks, so that the test can be continued after the machine is restarted""" + import datetime +import os from .document import Document, FactoryDocument from .env import CertEnv @@ -73,7 +76,7 @@ try: Command("systemctl daemon-reload").run_quiet() Command("systemctl enable oech").run_quiet() - except: + except Exception: print("Error: enable oech.service fail.") return False @@ -85,17 +88,20 @@ :return: """ doc = Document(CertEnv.rebootfile) - if not doc.load(): - print("Error: reboot file load fail.") + if os.path.exists(CertEnv.rebootfile): + if not doc.load(): + print("Error: reboot111 file load fail.") + return False + else: return False - + try: self.testname = doc.document["test"] self.reboot = doc.document self.job.job_id = self.reboot["job_id"] self.job.subtests_filter = self.reboot["rebootup"] time_reboot = datetime.datetime.strptime(self.reboot["time"], "%Y%m%d%H%M%S") - except: + except Exception: print("Error: reboot file format not as expect.") return False @@ -108,4 +114,3 @@ return False return True - diff -urN oec-hardware/hwcompatible/sysinfo.py oec-hardware_new/hwcompatible/sysinfo.py --- oec-hardware/hwcompatible/sysinfo.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/sysinfo.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""Get system information""" + import os import re @@ -40,10 +42,10 @@ :return: """ try: - f = open(filename) - text = f.read() - f.close() - except: + file_content = open(filename) + text = file_content.read() + file_content.close() + except Exception: print("Release file not found.") return @@ -56,12 +58,12 @@ results = pattern.findall(text) self.version = results[0].strip() if results else "" - with os.popen('uname -m') as p: - self.arch = p.readline().strip() + with os.popen('uname -m') as pop: + self.arch = pop.readline().strip() self.debug_kernel = "debug" in self.arch - with os.popen('uname -r') as p: - self.kernel = p.readline().strip() + with os.popen('uname -r') as pop: + self.kernel = pop.readline().strip() self.kernel_rpm = "kernel-{}".format(self.kernel) self.kerneldevel_rpm = "kernel-devel-{}".format(self.kernel) self.kernel_version = self.kernel.split('-')[0] @@ -72,4 +74,3 @@ :return: """ return self.version - diff -urN oec-hardware/hwcompatible/test.py oec-hardware_new/hwcompatible/test.py --- oec-hardware/hwcompatible/test.py 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/hwcompatible/test.py 2021-09-13 11:42:03.381474028 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""Test set template""" + class Test: """ @@ -24,7 +26,13 @@ self.rebootup = None def setup(self, args=None): + """ + setup + """ pass def teardown(self): + """ + teardown + """ pass diff -urN oec-hardware/hwcompatible/version.py oec-hardware_new/hwcompatible/version.py --- oec-hardware/hwcompatible/version.py 1970-01-01 08:00:00.000000000 +0800 +++ oec-hardware_new/hwcompatible/version.py 2021-09-13 11:42:03.381474028 +0800 @@ -0,0 +1,3 @@ +# hwcompatible/version.py is automatically-generated +version = '' +name = 'oec-hardware' diff -urN oec-hardware/README.md oec-hardware_new/README.md --- oec-hardware/README.md 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/README.md 2021-09-13 11:43:06.821961002 +0800 @@ -31,15 +31,15 @@ ## 背景介绍 -OS 厂商为了扩大自己产品的兼容性范围,常常寻求与硬件厂商的合作,进行兼容性测试。OS 厂商制定一个测试标准,并提供测试用例,硬件厂商进行实际的测试,测试通过后,OS 厂商和硬件厂商将共同对结果负责。这是一个双赢的合作,双方都可以藉此推销自己的产品。 +OS 厂商为了扩大自己产品的兼容性范围,常常寻求与硬件厂商的合作,进行兼容性测试。OS 厂商制定一个测试标准,并提供测试用例,硬件厂商进行实际的测试。测试通过后,OS 厂商和硬件厂商将共同在对应的官网发布兼容性信息。 -验证目的就是保证 OS 与硬件平台的兼容性,认证仅限于基本功能验证,不包括性能测试等其它测试。 +验证目的就是保证 OS 与硬件平台的兼容性,验证仅限于基本功能验证,不包括性能测试等其它测试。 openEuler硬件兼容性验证测试框架有如下特点: 1. 为满足可信要求,必须使用openEuler操作系统,不能随意重编/插入内核模块。 2. 通过扫描机制自适应发现硬件列表,来确定要运行的测试用例集合。 -3. 面向对象抽象各种硬件类型以及测试用例类,用于扩展开发。 +3. 面向对象抽象各种硬件类型以及测试用例类,用于扩展开发。 ## 原理简介 @@ -100,33 +100,19 @@ ## 获取安装包 -* 安装包从 openEuler 官方网站下载(暂未开放)。 - -* 校验安装包的完整性。 - - 1. 获取校验文件中的校验值: - - ``` - cat oec-hardware-*.rpm.sha256sum - ``` - - 2. 计算文件的 sha256 校验值: - - ``` - sha256sum oec-hardware-*.rpm - ``` - - 命令执行完成后,输出校验值。 - - 3. 对比步骤1和步骤2计算的校验值是否一致。 - - 如果校验值一致说明安装文件完整性没有破坏,如果校验值不一致则可以确认文件完整性已被破坏,需要重新获取。 +https://gitee.com/src-openeuler/oec-hardware/releases ## 安装过程 ### 客户端 -1. 配置 [openEuler 官方 repo](https://repo.openeuler.org/) 中对应版本的 everything 源,使用 `dnf` 安装客户端 oec-hardware。 +1. 安装 memtester 依赖工具。 + + ``` + dnf install memtester-4.3.0-18.fc33.aarch64(x86_64).rpm + ``` + +2. 配置 [openEuler 官方 repo](https://repo.openeuler.org/) 中对应版本的 everything 源,使用 `dnf` 安装客户端 oec-hardware。 ``` dnf install oec-hardware-XXX.rpm @@ -277,7 +263,6 @@ 1. **system** - 注意:在安装openEuler系统的时候,非最小安装会引入kmod-kvdo模块,此模块会修改内核,导致system测试项"检测内核是否被修改"不过。若非有意引入,可卸载该模块。 - 检查本工具是否被修改。 - 检查 OS 版本和 kernel 版本是否匹配。 - 检查内核是否被修改/感染。 diff -urN oec-hardware/scripts/oech oec-hardware_new/scripts/oech --- oec-hardware/scripts/oech 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/scripts/oech 2021-09-13 11:42:03.409474243 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""Operation of client """ + import os import sys import fcntl @@ -25,22 +27,31 @@ class CertLock: + """ + certlock + """ def __init__(self, filename): self.filename = filename - self.fd = open(filename, 'w') + self.fd_obj = open(filename, 'w') def acquire(self): + """ + acquire + """ try: - fcntl.flock(self.fd, fcntl.LOCK_EX|fcntl.LOCK_NB) + fcntl.flock(self.fd_obj, fcntl.LOCK_EX|fcntl.LOCK_NB) return True except IOError: return False def release(self): - fcntl.flock(self.fd, fcntl.LOCK_UN) + """ + release + """ + fcntl.flock(self.fd_obj, fcntl.LOCK_UN) def __del__(self): - self.fd.close() + self.fd_obj.close() if __name__ == '__main__': @@ -80,4 +91,3 @@ lock.release() sys.exit(0) - diff -urN oec-hardware/scripts/oech-server.service oec-hardware_new/scripts/oech-server.service --- oec-hardware/scripts/oech-server.service 2021-09-13 11:43:59.390364522 +0800 +++ oec-hardware_new/scripts/oech-server.service 2021-09-13 11:42:03.409474243 +0800 @@ -6,6 +6,8 @@ Type=notify ExecStartPre=/usr/share/oech/lib/server/oech-server-pre.sh ExecStart=/usr/local/bin/uwsgi --ini /usr/share/oech/lib/server/uwsgi.ini +ExecStop=/usr/local/bin/uwsgi --stop /usr/share/oech/lib/server/uwsgi.pid + [Install] WantedBy=multi-user.target diff -urN oec-hardware/server/server.py oec-hardware_new/server/server.py --- oec-hardware/server/server.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/server/server.py 2021-09-13 11:42:03.409474243 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""server""" + import os import json import time @@ -40,27 +42,42 @@ @app.errorhandler(400) -def bad_request(e): +def bad_request(): + """ + bad request + """ return render_template('error.html', error='400 - Bad Request'), 400 @app.errorhandler(404) def page_not_found(e): + """ + page not fount + """ return render_template('error.html', error='404 - Page Not Found'), 404 @app.errorhandler(500) def internal_server_error(e): + """ + internal server error + """ return render_template('error.html', error='500 - Internal Server Error'), 500 @app.route('/') def index(): + """ + index + """ return render_template('index.html') @app.route('/results') def get_results(): + """ + get results + """ results = {} for host in next(os.walk(dir_results))[1]: dir_host = os.path.join(dir_results, host) @@ -85,11 +102,11 @@ json_info = os.path.join(dir_job, 'compatibility.json') json_results = os.path.join(dir_job, 'factory.json') try: - with open(json_info, 'r') as f: - info = json.load(f) - with open(json_results, 'r') as f: - results = json.load(f) - except Exception as e: + with open(json_info, 'r') as file_content: + info = json.load(file_content) + with open(json_results, 'r') as file_content: + results = json.load(file_content) + except Exception: abort(404) return render_template('job.html', host=host, id=oec_id, job=job, info=info, results=results) @@ -107,9 +124,9 @@ dir_job = os.path.join(dir_results, host, oec_id, job) json_results = os.path.join(dir_job, 'factory.json') try: - with open(json_results, 'r') as f: - results = json.load(f) - except Exception as e: + with open(json_results, 'r') as file_content: + results = json.load(file_content) + except Exception: abort(404) for testcase in results: device = testcase.get('device') @@ -131,9 +148,9 @@ dir_job = os.path.join(dir_results, host, oec_id, job) json_devices = os.path.join(dir_job, 'device.json') try: - with open(json_devices, 'r') as f: - devices = json.load(f) - except Exception as e: + with open(json_devices, 'r') as file_content: + devices = json.load(file_content) + except Exception: abort(404) return render_template('devices.html', devices=devices) @@ -169,9 +186,9 @@ if not os.path.exists(logpath): logpath = os.path.join(dir_job, 'job.log') try: - with open(logpath, 'r') as f: - log = f.read().split('\n') - except Exception as e: + with open(logpath, 'r') as file_content: + log = file_content.read().split('\n') + except Exception: abort(404) return render_template('log.html', name=name, log=log) @@ -189,12 +206,12 @@ tar_job = dir_job + '.tar.gz' json_cert = os.path.join(dir_job, 'compatibility.json') try: - with open(json_cert, 'r') as f: - cert = json.load(f) - with open(tar_job, 'rb') as f: - attachment = base64.b64encode(f.read()) - except Exception as e: - print(e) + with open(json_cert, 'r') as file_content: + cert = json.load(file_content) + with open(tar_job, 'rb') as file_content: + attachment = base64.b64encode(file_content.read()) + except Exception as concrete_error: + print(concrete_error) abort(500) form = {} @@ -211,9 +228,9 @@ try: req = Request(url, data=data, headers=headers) res = urlopen(req) - except HTTPError as e: - print(e) - res = e + except HTTPError as concrete_error: + print(concrete_error) + res = concrete_error if res.code == 200: flash('Submit Successful', 'success') @@ -242,11 +259,11 @@ if not os.path.exists(dir_job): os.makedirs(dir_job) try: - with open(tar_job, 'wb') as f: - f.write(base64.b64decode(filetext)) + with open(tar_job, 'wb') as file_content: + file_content.write(base64.b64decode(filetext)) os.system("tar xf '%s' -C '%s'" % (tar_job, os.path.dirname(dir_job))) - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) abort(400) return render_template('upload.html', host=host, id=oec_id, job=job, filetext=filetext, ret='Successful') @@ -254,17 +271,26 @@ @app.route('/files') def get_files(): + """ + get files + """ files = os.listdir(dir_files) return render_template('files.html', files=files) @app.route('/files/') def download_file(path): + """ + download file + """ return send_from_directory('files', path, as_attachment=True) @app.route('/api/file/upload', methods=['GET', 'POST']) def upload_file(): + """ + upload_file + """ filename = request.values.get('filename', '') filetext = request.values.get('filetext', '') if not(all([filename, filetext])): @@ -275,10 +301,10 @@ if not os.path.exists(dir_files): os.makedirs(dir_files) try: - with open(filepath, 'wb') as f: - f.write(base64.b64decode(filetext)) - except Exception as e: - print(e) + with open(filepath, 'wb') as file_content: + file_content.write(base64.b64decode(filetext)) + except Exception as concrete_error: + print(concrete_error) abort(400) return render_template('upload.html', filename=filename, filetext=filetext, ret='Successful') @@ -286,6 +312,9 @@ @app.route('/api/', methods=['GET', 'POST']) def test_server(act): + """ + test server + """ valid_commands = ['rping', 'rcopy', 'ib_read_bw', 'ib_write_bw', 'ib_send_bw', 'qperf'] cmd = request.values.get('cmd', '') @@ -295,7 +324,7 @@ abort(400) if act == 'start': - if 'rping' == cmd[0]: + if cmd[0] == 'rping': cmd = ['rping', '-s'] if 'ib_' in cmd[0]: @@ -309,7 +338,7 @@ abort(400) cmd.extend(['-d', ibdev, '-i', ibport]) elif act == 'stop': - if 'all' == cmd[0]: + if cmd[0] == 'all': cmd = ['killall', '-9'] + valid_commands else: cmd = ['killall', '-9', cmd[0]] @@ -351,11 +380,10 @@ ibport = str(ibport) return ibdev, ibport - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return None, None if __name__ == '__main__': app.run(host='0.0.0.0', port=80) - diff -urN oec-hardware/server/uwsgi.ini oec-hardware_new/server/uwsgi.ini --- oec-hardware/server/uwsgi.ini 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/server/uwsgi.ini 2021-09-13 11:42:03.409474243 +0800 @@ -1,6 +1,11 @@ [uwsgi] socket = 127.0.0.1:8080 chdir = /usr/share/oech/lib/server -wsgi-file = server.py +wsgi-file = /usr/share/oech/lib/server/server.py callable = app processes = 4 +pidfile = /usr/share/oech/lib/server/uwsgi.pid +master = true +buffer-size = 65536 + + diff -urN oec-hardware/tests/acpi/acpi.py oec-hardware_new/tests/acpi/acpi.py --- oec-hardware/tests/acpi/acpi.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/tests/acpi/acpi.py 2021-09-13 11:42:03.409474243 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""acpi test""" + from hwcompatible.test import Test from hwcompatible.command import Command @@ -25,9 +27,12 @@ self.requirements = ["acpica-tools"] def test(self): + """ + start test + """ try: Command("acpidump").echo() return True - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False diff -urN oec-hardware/tests/cdrom/cdrom.py oec-hardware_new/tests/cdrom/cdrom.py --- oec-hardware/tests/cdrom/cdrom.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/tests/cdrom/cdrom.py 2021-09-13 12:54:30.082712105 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""cdrom test""" + import os import sys import time @@ -34,7 +36,7 @@ self.device = None self.type = None self.args = None - self.ui = CommandUI() + self.com_ui = CommandUI() self.test_dir = "/usr/share/doc" def setup(self, args=None): @@ -62,12 +64,13 @@ devname = self.device.get_property("DEVNAME") Command("eject %s" % devname).run(ignore_errors=True) while True: - print("Please insert %s disc into %s, then close the tray manually." % (self.type.lower(), devname)) + print("Please insert %s disc into %s, then close the tray manually."\ + % (self.type.lower(), devname)) if self.method == "write_test": print(" tips:disc should be new.") elif self.method == "read_test": print(" tips:disc should not be blank.") - if self.ui.prompt_confirm("Done well?"): + if self.com_ui.prompt_confirm("Done well?"): break Command("eject -t %s" % devname).run(ignore_errors=True) print("Waiting media..).") @@ -86,7 +89,7 @@ if not device: return None - bd_types = ["BD_RE", "BD_R", "BD"] + bd_types = ["BD_RE", "BD_R", "BD"] dvd_types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD"] cd_types = ["CD_RW", "CD_R", "CD"] for bd_type in bd_types: @@ -99,7 +102,7 @@ if device.get_property("ID_CDROM_" + cd_type) == "1": return cd_type - print("Can not find proper test-type for %s." % device.get_name()) + print("Can not find pr)oper test-type for %s." % device.get_name()) return None def get_mode(self, device_type): @@ -148,7 +151,7 @@ self.reload_disc(devname) sys.stdout.flush() return self.write_test() - except CertCommandError as e: + except CertCommandError: return False def write_test(self): @@ -168,18 +171,20 @@ write_opts = "-sao" try: command = Command("cdrecord dev=%s -checkdrive" % devname) - modes = command.get_str(regex="^Supported modes[^:]*:(?P.*$)", regex_group="modes", + modes = command.get_str(regex="^Supported modes[^:]*:(?P.*$)", \ + regex_group="modes", single_line=False, ignore_errors=True) if "TAO" in modes: write_opts = "-tao" if "SAO" in modes: write_opts = "-sao" - flags = command.get_str(regex="^Driver flags[^:]*:(?P.*$)", regex_group="flags", + flags = command.get_str(regex="^Driver flags[^:]*:(?P.*$)", \ + regex_group="flags", single_line=False, ignore_errors=True) if "BURNFREE" in flags: write_opts += " driveropts=burnfree" - except CertCommandError as e: - print(e) + except CertCommandError as concrete_error: + print(concrete_error) size = Command("mkisofs -quiet -R -print-size %s " % self.test_dir).get_str() blocks = int(size) @@ -189,7 +194,7 @@ self.reload_disc(devname) sys.stdout.flush() return True - except CertCommandError as e: + except CertCommandError as concrete_error: return False def read_test(self): @@ -229,8 +234,8 @@ Command("umount ./mnt_cdrom").run(ignore_errors=True) Command("rm -rf ./mnt_cdrom ./device_dir").run(ignore_errors=True) return return_code - except CertCommandError as e: - print(e) + except CertCommandError as concrete_error: + print(concrete_error) return False def cmp_tree(self, dir1, dir2): @@ -246,7 +251,7 @@ try: Command("diff -r %s %s" % (dir1, dir2)).run() return True - except CertCommandError as e: + except CertCommandError: print("Error: file comparison failed.") return False @@ -265,16 +270,16 @@ Command("eject %s" % device).run() print("tray ejected.") sys.stdout.flush() - except: + except Exception: pass try: Command("eject -t %s" % device).run() print("tray auto-closed.\n") sys.stdout.flush() - except: + except Exception: print("Could not auto-close the tray, please close the tray manually.") - self.ui.prompt_confirm("Done well?") + self.com_ui.prompt_confirm("Done well?") time.sleep(20) return True diff -urN oec-hardware/tests/clock/clock.py oec-hardware_new/tests/clock/clock.py --- oec-hardware/tests/clock/clock.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/tests/clock/clock.py 2021-09-13 11:42:03.409474243 +0800 @@ -12,8 +12,11 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""clock test""" + import os +from hwcompatible.command import Command, CertCommandError from hwcompatible.test import Test clock_dir = os.path.dirname(os.path.realpath(__file__)) @@ -28,7 +31,12 @@ Clock test case :return: """ - return 0 == os.system("cd %s; ./clock" % clock_dir) + try: + Command("cd %s; ./clock" % clock_dir).echo() + return True + except CertCommandError as concrete_error: + print(concrete_error) + return False if __name__ == '__main__': diff -urN oec-hardware/tests/cpufreq/cal.py oec-hardware_new/tests/cpufreq/cal.py --- oec-hardware/tests/cpufreq/cal.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/tests/cpufreq/cal.py 2021-09-13 11:42:03.409474243 +0800 @@ -12,11 +12,14 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""call test_case""" + import decimal import time def cal(): + """call test_case""" decimal.getcontext().prec = 1000 one = decimal.Decimal(1) for i in range(1000): diff -urN oec-hardware/tests/cpufreq/cpufreq.py oec-hardware_new/tests/cpufreq/cpufreq.py --- oec-hardware/tests/cpufreq/cpufreq.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/tests/cpufreq/cpufreq.py 2021-09-13 12:53:08.458080148 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""cpufreq test""" + from random import randint from time import sleep @@ -21,6 +23,9 @@ class CPU: + """ + cpufreq test + """ def __init__(self): self.cpu = None self.nums = None @@ -39,8 +44,8 @@ cmd = Command("lscpu") try: nums = cmd.get_str(r'^CPU\S*:\s+(?P\d+)$', 'cpus', False) - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False self.nums = int(nums) self.list = range(self.nums) @@ -48,16 +53,16 @@ cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq") try: max_freq = cmd.get_str() - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False self.max_freq = int(max_freq) cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_min_freq") try: min_freq = cmd.get_str() - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False self.min_freq = int(min_freq) @@ -74,8 +79,8 @@ try: cmd.run() return cmd.returncode - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False def get_freq(self, cpu): @@ -87,8 +92,8 @@ cmd = Command("cpupower -c %s frequency-info -w" % cpu) try: return int(cmd.get_str(r'.* frequency: (?P\d+) .*', 'freq', False)) - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False def set_governor(self, governor, cpu='all'): @@ -102,8 +107,8 @@ try: cmd.run() return cmd.returncode - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False def get_governor(self, cpu): @@ -115,8 +120,8 @@ cmd = Command("cpupower -c %s frequency-info -p" % cpu) try: return cmd.get_str(r'.* governor "(?P\w+)".*', 'governor', False) - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False def find_path(self, parent_dir, target_name): @@ -130,8 +135,8 @@ try: cmd.run() return cmd.returncode - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return False @@ -141,7 +146,8 @@ """ def __init__(self, cpu): self.cpu = cpu - self.process = Command("taskset -c {} python -u {}/cpufreq/cal.py".format(self.cpu, CertEnv.testdirectoy)) + self.process = Command("taskset -c {} python -u {}/cpufreq/cal.py".\ + format(self.cpu, CertEnv.testdirectoy)) self.returncode = None def run(self): diff -urN oec-hardware/tests/cpufreq/cpufreq.py.orig oec-hardware_new/tests/cpufreq/cpufreq.py.orig --- oec-hardware/tests/cpufreq/cpufreq.py.orig 1970-01-01 08:00:00.000000000 +0800 +++ oec-hardware_new/tests/cpufreq/cpufreq.py.orig 2021-09-13 11:42:03.409474243 +0800 @@ -0,0 +1,466 @@ +#!/usr/bin/env python +# coding: utf-8 + +# Copyright (c) 2020 Huawei Technologies Co., Ltd. +# oec-hardware is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# Create: 2020-04-01 + +"""cpufreq test""" + +from random import randint +from time import sleep + +from hwcompatible.env import CertEnv +from hwcompatible.test import Test +from hwcompatible.command import Command + + +class CPU: + """ + cpufreq test + """ + def __init__(self): + self.cpu = None + self.nums = None + self.list = None + self.numa_nodes = None + self.governors = None + self.original_governor = None + self.max_freq = None + self.min_freq = None + + def get_info(self): + """ + Get CPU info + :return: + """ + cmd = Command("lscpu") + try: + nums = cmd.get_str(r'^CPU\S*:\s+(?P\d+)$', 'cpus', False) + except Exception as concrete_error: + print(concrete_error) + return False + self.nums = int(nums) + self.list = range(self.nums) + + cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq") + try: + max_freq = cmd.get_str() + except Exception as concrete_error: + print(concrete_error) + return False + self.max_freq = int(max_freq) + + cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_min_freq") + try: + min_freq = cmd.get_str() + except Exception as concrete_error: + print(concrete_error) + return False + self.min_freq = int(min_freq) + + return True + + def set_freq(self, freq, cpu='all'): + """ + Set CPU frequency + :param freq: + :param cpu: + :return: + """ + cmd = Command("cpupower -c %s frequency-set --freq %s" % (cpu, freq)) + try: + cmd.run() + return cmd.returncode + except Exception as concrete_error: + print(concrete_error) + return False + + def get_freq(self, cpu): + """ + Get CPU frequency + :param cpu: + :return: + """ + cmd = Command("cpupower -c %s frequency-info -w" % cpu) + try: + return int(cmd.get_str(r'.* frequency: (?P\d+) .*', 'freq', False)) + except Exception as concrete_error: + print(concrete_error) + return False + + def set_governor(self, governor, cpu='all'): + """ + Set the frequency governor mode of CPU + :param governor: + :param cpu: + :return: + """ + cmd = Command("cpupower -c %s frequency-set --governor %s" % (cpu, governor)) + try: + cmd.run() + return cmd.returncode + except Exception as concrete_error: + print(concrete_error) + return False + + def get_governor(self, cpu): + """ + Get cpu governor + :param cpu: + :return: + """ + cmd = Command("cpupower -c %s frequency-info -p" % cpu) + try: + return cmd.get_str(r'.* governor "(?P\w+)".*', 'governor', False) + except Exception as concrete_error: + print(concrete_error) + return False + + def find_path(self, parent_dir, target_name): + """ + Find the target path from the specified directory + :param parent_dir: + :param target_name: + :return: + """ + cmd = Command("find %s -name %s" % (parent_dir, target_name)) + try: + cmd.run() + return cmd.returncode + except Exception as concrete_error: + print(concrete_error) + return False + + +class Load: + """ + Let a program run on a specific CPU + """ + def __init__(self, cpu): + self.cpu = cpu + self.process = Command("taskset -c {} python -u {}/cpufreq/cal.py".\ + format(self.cpu, CertEnv.testdirectoy)) + self.returncode = None + + def run(self): + """ + Process started + :return: + """ + self.process.start() # background + + def get_runtime(self): + """ + Get the running time of the process + :return: + """ + if not self.process: + return None + + while self.returncode is None: + self.returncode = self.process.poll() + if self.returncode == 0: + line = self.process.readline() + return float(line) + else: + return False + + +class CPUFreqTest(Test): + """ + CPU frequency test + """ + def __init__(self): + Test.__init__(self) + self.requirements = ['util-linux', 'kernel-tools'] + self.cpu = CPU() + self.original_governor = self.cpu.get_governor(0) + + def test_userspace(self): + """ + userspace mode of testing CPU frequency + :return: + """ + target_cpu = randint(0, self.cpu.nums-1) + target_freq = randint(self.cpu.min_freq, self.cpu.max_freq) + if self.cpu.set_freq(target_freq, cpu=target_cpu) != 0: + print("[X] Set CPU%s to freq %d failed." % (target_cpu, target_freq)) + return False + print("[.] Set CPU%s to freq %d." % (target_cpu, target_freq)) + target_cpu_freq = self.cpu.get_freq(target_cpu) + print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq)) + + target_cpu_governor = self.cpu.get_governor(target_cpu) + if target_cpu_governor != 'userspace': + print("[X] The governor of CPU%s(%s) is not userspace." % + (target_cpu, target_cpu_governor)) + return False + print("[.] The governor of CPU%s is %s." % + (target_cpu, target_cpu_governor)) + + # min_freq -> max_runtime + self.cpu.set_freq(self.cpu.min_freq) + load_list = [] + runtime_list = [] + for cpu in self.cpu.list: + load_test = Load(cpu) + load_test.run() + load_list.append(load_test) + for cpu in self.cpu.list: + runtime = load_list[cpu].get_runtime() + runtime_list.append(runtime) + max_average_runtime = 1.0 * sum(runtime_list) / len(runtime_list) + if max_average_runtime == 0: + print("[X] Max average time is 0.") + return False + print("[.] Max average time of all CPUs userspace load test: %.2f" % + max_average_runtime) + + # max_freq -> min_runtime + self.cpu.set_freq(self.cpu.max_freq) + load_list = [] + runtime_list = [] + for cpu in self.cpu.list: + load_test = Load(cpu) + load_test.run() + load_list.append(load_test) + for cpu in self.cpu.list: + runtime = load_list[cpu].get_runtime() + runtime_list.append(runtime) + min_average_runtime = 1.0 * sum(runtime_list) / len(runtime_list) + if min_average_runtime == 0: + print("[X] Min average time is 0.") + return False + print("[.] Min average time of all CPUs userspace load test: %.2f" % + min_average_runtime) + + measured_speedup = 1.0 * max_average_runtime / min_average_runtime + expected_speedup = 1.0 * self.cpu.max_freq / self.cpu.min_freq + tolerance = 1.0 + min_speedup = expected_speedup - (expected_speedup - 1.0) * tolerance + max_speedup = expected_speedup + (expected_speedup - 1.0) * tolerance + if not min_speedup < measured_speedup < max_speedup: + print("[X] The speedup(%.2f) is not between %.2f and %.2f" % + (measured_speedup, min_speedup, max_speedup)) + return False + print("[.] The speedup(%.2f) is between %.2f and %.2f" % + (measured_speedup, min_speedup, max_speedup)) + + return True + + def test_ondemand(self): + """ + ondemand mode of testing CPU frequency + :return: + """ + if self.cpu.set_governor('powersave') != 0: + print("[X] Set governor of all CPUs to powersave failed.") + return False + print("[.] Set governor of all CPUs to powersave.") + + if self.cpu.set_governor('ondemand') != 0: + print("[X] Set governor of all CPUs to ondemand failed.") + return False + print("[.] Set governor of all CPUs to ondemand.") + + target_cpu = randint(0, self.cpu.nums) + target_cpu_governor = self.cpu.get_governor(target_cpu) + if target_cpu_governor != 'ondemand': + print("[X] The governor of CPU%s(%s) is not ondemand." % + (target_cpu, target_cpu_governor)) + return False + print("[.] The governor of CPU%s is %s." % + (target_cpu, target_cpu_governor)) + + load_test = Load(target_cpu) + load_test.run() + sleep(1) + target_cpu_freq = self.cpu.get_freq(target_cpu) + if target_cpu_freq != self.cpu.max_freq: + print("[X] The freq of CPU%s(%d) is not scaling_max_freq(%d)." % + (target_cpu, target_cpu_freq, self.cpu.max_freq)) + return False + print("[.] The freq of CPU%s is scaling_max_freq(%d)." % + (target_cpu, target_cpu_freq)) + + load_test_time = load_test.get_runtime() + print("[.] Time of CPU%s ondemand load test: %.2f" % + (target_cpu, load_test_time)) + target_cpu_freq = self.cpu.get_freq(target_cpu) + if not target_cpu_freq <= self.cpu.max_freq: + print("[X] The freq of CPU%s(%d) is not less equal than %d." % + (target_cpu, target_cpu_freq, self.cpu.max_freq)) + return False + print("[.] The freq of CPU%s(%d) is less equal than %d." % + (target_cpu, target_cpu_freq, self.cpu.max_freq)) + + return True + + def test_conservative(self): + """ + conservative mode of testing CPU frequency + :return: + """ + if self.cpu.set_governor('powersave') != 0: + print("[X] Set governor of all CPUs to powersave failed.") + return False + print("[.] Set governor of all CPUs to powersave.") + + if self.cpu.set_governor('conservative') != 0: + print("[X] Set governor of all CPUs to conservative failed.") + return False + print("[.] Set governor of all CPUs to conservative.") + + target_cpu = randint(0, self.cpu.nums) + target_cpu_governor = self.cpu.get_governor(target_cpu) + if target_cpu_governor != 'conservative': + print("[X] The governor of CPU%s(%s) is not conservative." % + (target_cpu, target_cpu_governor)) + return False + print("[.] The governor of CPU%s is %s." % + (target_cpu, target_cpu_governor)) + + load_test = Load(target_cpu) + load_test.run() + sleep(1) + target_cpu_freq = self.cpu.get_freq(target_cpu) + if not self.cpu.min_freq < target_cpu_freq < self.cpu.max_freq: + print("[X] The freq of CPU%s(%d) is not between %d~%d." % + (target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq)) + return False + print("[.] The freq of CPU%s(%d) is between %d~%d." % + (target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq)) + + load_test_time = load_test.get_runtime() + print("[.] Time of CPU%s conservative load test: %.2f" % + (target_cpu, load_test_time)) + target_cpu_freq = self.cpu.get_freq(target_cpu) + print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq)) + + return True + + def test_powersave(self): + """ + powersave mode of testing CPU frequency + :return: + """ + if self.cpu.set_governor('powersave') != 0: + print("[X] Set governor of all CPUs to powersave failed.") + return False + print("[.] Set governor of all CPUs to powersave.") + + target_cpu = randint(0, self.cpu.nums) + target_cpu_governor = self.cpu.get_governor(target_cpu) + if target_cpu_governor != 'powersave': + print("[X] The governor of CPU%s(%s) is not powersave." % + (target_cpu, target_cpu_governor)) + return False + print("[.] The governor of CPU%s is %s." % + (target_cpu, target_cpu_governor)) + + target_cpu_freq = self.cpu.get_freq(target_cpu) + if target_cpu_freq != self.cpu.min_freq: + print("[X] The freq of CPU%s(%d) is not scaling_min_freq(%d)." % + (target_cpu, target_cpu_freq, self.cpu.min_freq)) + return False + print("[.] The freq of CPU%s is %d." % (target_cpu, target_cpu_freq)) + + load_test = Load(target_cpu) + load_test.run() + load_test_time = load_test.get_runtime() + print("[.] Time of CPU%s powersave load test: %.2f" % + (target_cpu, load_test_time)) + target_cpu_freq = self.cpu.get_freq(target_cpu) + print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq)) + + return True + + def test_performance(self): + """ + Performance mode of testing CPU frequency + :return: + """ + if self.cpu.set_governor('performance') != 0: + print("[X] Set governor of all CPUs to performance failed.") + return False + print("[.] Set governor of all CPUs to performance.") + + target_cpu = randint(0, self.cpu.nums) + target_cpu_governor = self.cpu.get_governor(target_cpu) + if target_cpu_governor != 'performance': + print("[X] The governor of CPU%s(%s) is not performance." % + (target_cpu, target_cpu_governor)) + return False + print("[.] The governor of CPU%s is %s." % + (target_cpu, target_cpu_governor)) + + target_cpu_freq = self.cpu.get_freq(target_cpu) + if target_cpu_freq != self.cpu.max_freq: + print("[X] The freq of CPU%s(%d) is not scaling_max_freq(%d)." % + (target_cpu, target_cpu_freq, self.cpu.max_freq)) + return False + print("[.] The freq of CPU%s is %d." % (target_cpu, target_cpu_freq)) + + load_test = Load(target_cpu) + load_test.run() + load_test_time = load_test.get_runtime() + print("[.] Time of CPU%s performance load test: %.2f" % + (target_cpu, load_test_time)) + target_cpu_freq = self.cpu.get_freq(target_cpu) + print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq)) + + return True + + def test(self): + """ + Test case + :return: + """ + if not self.cpu.get_info(): + print("[X] Fail to get CPU info." + " Please check if the CPU supports cpufreq.") + return False + + ret = True + print("") + print("[.] Test userspace") + if not self.test_userspace(): + print("[X] Test userspace FAILED") + ret = False + print("") + print("[.] Test ondemand") + if not self.test_ondemand(): + print("[X] Test ondemand FAILED") + ret = False + print("") + print("[.] Test conservative") + if not self.test_conservative(): + print("[X] Test conservative FAILED") + ret = False + print("") + print("[.] Test powersave") + if not self.test_powersave(): + print("[X] Test powersave FAILED") + ret = False + print("") + print("[.] Test performance") + if not self.test_performance(): + print("[X] Test performance FAILED") + ret = False + + self.cpu.set_governor(self.original_governor) + return ret + + +if __name__ == "__main__": + t = CPUFreqTest() + t.setup() + t.test() diff -urN oec-hardware/tests/disk/disk.py oec-hardware_new/tests/disk/disk.py --- oec-hardware/tests/disk/disk.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/tests/disk/disk.py 2021-09-13 11:42:03.409474243 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""disk test""" + import os import sys import shutil @@ -23,12 +25,14 @@ class DiskTest(Test): - + """ + disk test + """ def __init__(self): Test.__init__(self) self.disks = list() self.filesystems = ["ext4"] - self.ui = CommandUI() + self.com_ui = CommandUI() def setup(self, args=None): """ @@ -52,18 +56,22 @@ Command("cat /proc/mdstat").echo(ignore_errors=True) sys.stdout.flush() print("\n") - except Exception as e: + except Exception as concrete_error: print("Warning: could not get disk info") - print(e) + print(concrete_error) def test(self): + """ + start test + """ self.get_disk() if len(self.disks) == 0: print("No suite disk found to test.") return False self.disks.append("all") - disk = self.ui.prompt_edit("Which disk would you like to test: ", self.disks[0], self.disks) + disk = self.com_ui.prompt_edit("Which disk would you like to test: ",\ + self.disks[0], self.disks) return_code = True if disk == "all": for disk in self.disks[:-1]: @@ -79,12 +87,16 @@ return return_code def get_disk(self): + """ + get disk info + """ self.disks = list() disks = list() devices = CertDevice().get_devices() for device in devices: - if (device.get_property("DEVTYPE") == "disk" and not device.get_property("ID_TYPE")) or \ - device.get_property("ID_TYPE") == "disk": + if (device.get_property("DEVTYPE") == "disk" and not \ + device.get_property("ID_TYPE")) or device.\ + get_property("ID_TYPE") == "disk": if "/host" in device.get_property("DEVPATH"): disks.append(device.get_name()) @@ -123,6 +135,9 @@ print("These disks %s are in use now, skip them." % "|".join(un_suitable)) def raw_test(self, disk): + """ + raw test + """ print("\n#############") print("%s raw IO test" % disk) device = "/dev/" + disk @@ -147,7 +162,8 @@ return False print("\nStarting rand raw IO test...") - opts = "-direct=1 -iodepth 4 -rw=randrw -rwmixread=50 -group_reporting -name=file -runtime=300" + opts = "-direct=1 -iodepth 4 -rw=randrw -rwmixread=50 " \ + "-group_reporting -name=file -runtime=300" if not self.do_fio(device, size, opts): print("%s rand raw IO test fail." % device) print("#############") @@ -157,6 +173,9 @@ return True def vfs_test(self, disk): + """ + vfs test + """ print("\n#############") print("%s vfs test" % disk) device = "/dev/" + disk @@ -179,12 +198,12 @@ path = os.path.join(os.getcwd(), "vfs_test") return_code = True - for fs in self.filesystems: + for file_sys in self.filesystems: try: - print("\nFormatting %s to %s ..." % (device, fs)) + print("\nFormatting %s to %s ..." % (device, file_sys)) Command("umount %s" % device).echo(ignore_errors=True) - Command("mkfs -t %s -F %s 2>/dev/null" % (fs, device)).echo() - Command("mount -t %s %s %s" % (fs, device, "vfs_test")).echo() + Command("mkfs -t %s -F %s 2>/dev/null" % (file_sys, device)).echo() + Command("mount -t %s %s %s" % (file_sys, device, "vfs_test")).echo() print("\nStarting sequential vfs IO test...") opts = "-direct=1 -iodepth 4 -rw=rw -rwmixread=50 -name=directoy -runtime=300" @@ -197,8 +216,8 @@ if not self.do_fio(path, size, opts): return_code = False break - except Exception as e: - print(e) + except Exception as concrete_error: + print(concrete_error) return_code = False break @@ -208,17 +227,20 @@ return return_code def do_fio(self, filepath, size, option): + """ + fio test + """ if os.path.isdir(filepath): file_opt = "-directory=%s" % filepath else: file_opt = "-filename=%s" % filepath max_bs = 64 - bs = 4 - while bs <= max_bs: - if os.system("fio %s -size=%dK -bs=%dK %s" % (file_opt, size, bs, option)) != 0: + a_bs = 4 + while a_bs <= max_bs: + if os.system("fio %s -size=%dK -bs=%dK %s" % (file_opt, size, a_bs, option)) != 0: print("Error: %s fio failed." % filepath) return False print("\n") sys.stdout.flush() - bs = bs * 2 + a_bs = a_bs * 2 return True diff -urN oec-hardware/tests/ipmi/ipmi.py oec-hardware_new/tests/ipmi/ipmi.py --- oec-hardware/tests/ipmi/ipmi.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/tests/ipmi/ipmi.py 2021-09-13 11:42:03.413474274 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""ipmi test""" + from hwcompatible.test import Test from hwcompatible.command import Command @@ -31,8 +33,9 @@ """ try: Command("systemctl start ipmi").run() - Command("systemctl status ipmi.service").get_str(regex="Active: active", single_line=False) - except: + Command("systemctl status ipmi.service").get_str(regex="Active: active", \ + single_line=False) + except Exception: print("ipmi service cant't be started") return False return True @@ -46,7 +49,7 @@ for cmd in cmd_list: try: Command(cmd).echo() - except: + except Exception: print("%s return error." % cmd) return False return True diff -urN oec-hardware/tests/kdump/kdump.py oec-hardware_new/tests/kdump/kdump.py --- oec-hardware/tests/kdump/kdump.py 2021-09-13 11:43:59.394364553 +0800 +++ oec-hardware_new/tests/kdump/kdump.py 2021-09-13 11:42:03.413474274 +0800 @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. # Create: 2020-04-01 +"""kdump test""" + import os import sys import time @@ -26,6 +28,7 @@ """ Kdump Test """ + def __init__(self): Test.__init__(self) self.pri = 9 @@ -42,7 +45,7 @@ """ try: Command("cat /proc/cmdline").get_str(r"crashkernel=[^\ ]*") - except: + except Exception: print("Error: no crashkernel found.") return False @@ -58,8 +61,9 @@ try: Command("systemctl restart kdump").run() - Command("systemctl status kdump").get_str(regex="Active: active", single_line=False) - except: + Command("systemctl status kdump").get_str(regex="Active: active", + single_line=False) + except Exception: print("Error: kdump service not working.") return False @@ -68,8 +72,8 @@ config.dump() print("#############") - ui = CommandUI() - if ui.prompt_confirm("System will reboot, are you ready?"): + com_ui = CommandUI() + if com_ui.prompt_confirm("System will reboot, are you ready?"): print("\ntrigger crash...") sys.stdout.flush() os.system("sync") @@ -89,8 +93,9 @@ if config.get_parameter("path"): self.vmcore_path = config.get_parameter("path") - dir_pattern = re.compile(r'(?P[0-9]+\.[0-9]+\.[0-9]+)-(?P[0-9]+[-.][0-9]+[-.][0-9]+)-' - r'(?P