From 01c845220663a2572b6559bc25b52da1b2863256 Mon Sep 17 00:00:00 2001 From: rabbitali Date: Wed, 30 Aug 2023 10:59:52 +0800 Subject: [PATCH 1/1] update query disk info func --- ceres/manages/collect_manage.py | 38 ++--- ceres/tests/manages/test_collect_manage.py | 163 +++++++++++++++++---- 2 files changed, 152 insertions(+), 49 deletions(-) diff --git a/ceres/manages/collect_manage.py b/ceres/manages/collect_manage.py index 3472903..145d6dc 100644 --- a/ceres/manages/collect_manage.py +++ b/ceres/manages/collect_manage.py @@ -17,6 +17,7 @@ import pwd import re from socket import AF_INET, SOCK_DGRAM, socket from typing import Any, Dict, List, Union +import xml.etree.ElementTree as ET from ceres.conf.constant import ( HOST_COLLECT_INFO_SUPPORT, @@ -305,30 +306,33 @@ class Collect: } ] """ - code, stdout, _ = execute_shell_command("lshw -json -c disk") + code, stdout, _ = execute_shell_command("lshw -xml -c disk") if code != CommandExitCode.SUCCEED: LOGGER.error(stdout) return [] - # Convert the command result to a json string - # lshw_data e.g "{...},{...},{...}" - lshw_data = f"[{stdout}]" - try: - disk_info_list = json.loads(lshw_data) - except json.decoder.JSONDecodeError: - LOGGER.warning("Json conversion error, " "please check command 'lshw -json -c disk'") - disk_info_list = [] + tree = ET.ElementTree(ET.fromstring(stdout)) + except ET.ParseError as error: + LOGGER.error(error) + LOGGER.warning("disk info parse error, please check command 'lshw -xml -c disk'") + return [] + + disk_list = tree.findall("node") + + if not disk_list: + return [] res = [] - if disk_info_list: - for disk_info in disk_info_list: - res.append( - { - "model": disk_info.get('description') or disk_info.get('product'), - "capacity": f"{disk_info.get('size', 0) // 10 ** 9}GB", - } - ) + for node in disk_list: + model = node.find("description") if node.find("product") is None else node.find("product") + size = node.find("size") + res.append( + { + "model": model.text if model is not None else "unknown", + "capacity": f"{int(size.text) / (1024**3)} GB" if size is not None else "unknown", + } + ) return res diff --git a/ceres/tests/manages/test_collect_manage.py b/ceres/tests/manages/test_collect_manage.py index b27af55..243aa4c 100644 --- a/ceres/tests/manages/test_collect_manage.py +++ b/ceres/tests/manages/test_collect_manage.py @@ -17,6 +17,7 @@ import pwd import unittest import warnings from unittest import mock +import xml.etree.ElementTree as ET from ceres.conf.constant import CommandExitCode from ceres.manages.collect_manage import Collect @@ -454,60 +455,158 @@ class TestCollectManage(unittest.TestCase): def test_get_disk_info_should_return_disk_info_when_shell_command_execute_succeed_and_only_contain_description( self, mock_execute_shell_command ): - mock_execute_shell_command.return_value = ( - CommandExitCode.SUCCEED, - '{"description": "ATA Disk", "size": 42949672960}', - "", - ) - self.assertEqual([{"model": "ATA Disk", "capacity": "42GB"}], Collect()._get_disk_info()) + cmd_output = """ + + + + + + + Virtual I/O device + 0 + virtio@3 + /dev/vda + 42949672960 + + + + + + + + Partitioned disk + MS-DOS partition table + + + + + + +""" + mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, cmd_output, "" + self.assertEqual([{"model": "Virtual I/O device", "capacity": "40.0 GB"}], Collect()._get_disk_info()) @mock.patch('ceres.manages.collect_manage.execute_shell_command') def test_get_disk_info_should_return_disk_info_when_shell_command_execute_succeed_and_has_no_description_or_product( self, mock_execute_shell_command ): - mock_execute_shell_command.return_value = ( - CommandExitCode.SUCCEED, - '{"size": 42949672960}', - "", - ) - self.assertEqual([{"model": None, "capacity": "42GB"}], Collect()._get_disk_info()) + cmd_output = """ + + + + + + + 0 + virtio@3 + /dev/vda + 42949672960 + + + + + + + + Partitioned disk + MS-DOS partition table + + + + + + +""" + mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, cmd_output, "" + self.assertEqual([{"model": "unknown", "capacity": "40.0 GB"}], Collect()._get_disk_info()) @mock.patch('ceres.manages.collect_manage.execute_shell_command') def test_get_disk_info_should_return_disk_info_when_shell_command_execute_succeed_and_contain_description_and_product( self, mock_execute_shell_command ): - mock_execute_shell_command.return_value = ( - CommandExitCode.SUCCEED, - '{"description": "ATA Disk", "size": 42949672960,"product": "MOCK PRODUCT"}', - "", - ) - self.assertEqual([{"model": "ATA Disk", "capacity": "42GB"}], Collect()._get_disk_info()) + cmd_output = """ + + + + + + + Virtual I/O device + ATA Disk + 0 + virtio@3 + /dev/vda + 42949672960 + + + + + + + + Partitioned disk + MS-DOS partition table + + + + + + +""" + mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, cmd_output, "" + self.assertEqual([{"model": "ATA Disk", "capacity": "40.0 GB"}], Collect()._get_disk_info()) @mock.patch('ceres.manages.collect_manage.execute_shell_command') def test_get_disk_info_should_return_disk_info_when_shell_command_execute_succeed_and_only_contain_product( self, mock_execute_shell_command ): - mock_execute_shell_command.return_value = ( - CommandExitCode.SUCCEED, - '{"product": "MOCK PRODUCT", "size": 42949672960}', - "", - ) - self.assertEqual([{"model": "MOCK PRODUCT", "capacity": "42GB"}], Collect()._get_disk_info()) + cmd_output = """ + + + + + + + MOCK PRODUCT + 0 + virtio@3 + /dev/vda + 42949672960 + + + + + + + + Partitioned disk + MS-DOS partition table + + + + + + +""" + mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, cmd_output, "" + self.assertEqual([{"model": "MOCK PRODUCT", "capacity": "40.0 GB"}], Collect()._get_disk_info()) @mock.patch('ceres.manages.collect_manage.execute_shell_command') def test_get_disk_info_should_return_disk_info_when_shell_command_execute_fail(self, mock_execute_shell_command): mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", "" self.assertEqual([], Collect()._get_disk_info()) - @mock.patch.object(json, "loads") + @mock.patch.object(ET, "ElementTree") @mock.patch('ceres.manages.collect_manage.execute_shell_command') def test_get_disk_info_should_return_disk_info_when_shell_command_execute_succeed_but_decode_error( - self, mock_execute_shell_command, mock_json_loads + self, mock_execute_shell_command, mock_parse_xml ): - mock_execute_shell_command.return_value = ( - CommandExitCode.SUCCEED, - '{"product": "MOCK PRODUCT", "size": 42949672960}', - "", - ) - mock_json_loads.side_effect = json.decoder.JSONDecodeError('', '', int()) + mock_cmd_output = """ + + + + +""" + mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, "" + mock_parse_xml.side_effect = ET.ParseError self.assertEqual([], Collect()._get_disk_info()) -- 2.33.0