From b9368c117d8588d4f048f8e0113d5517bb55f00c Mon Sep 17 00:00:00 2001 From: wenxin Date: Wed, 23 Nov 2022 14:25:41 +0800 Subject: [PATCH] remove test cases that use the responses module --- zeus/tests/test_config_manager.py | 46 +------- .../test_host_manager/test_delete_host.py | 102 +----------------- .../test_host_manager/test_get_host_info.py | 56 ---------- .../test_repo_set.py | 43 -------- 4 files changed, 2 insertions(+), 245 deletions(-) diff --git a/zeus/tests/test_config_manager.py b/zeus/tests/test_config_manager.py index bd806a8..e41f109 100644 --- a/zeus/tests/test_config_manager.py +++ b/zeus/tests/test_config_manager.py @@ -21,13 +21,12 @@ import json import requests from flask import Flask -import responses import zeus from vulcanus.database.proxy import MysqlProxy from vulcanus.multi_thread_handler import MultiThreadHandler from zeus.account_manager.cache import UserCache, UserInfo -from vulcanus.restful.status import SUCCEED, PARAM_ERROR, SERVER_ERROR, DATABASE_CONNECT_ERROR +from vulcanus.restful.status import SUCCEED, PARAM_ERROR, DATABASE_CONNECT_ERROR from zeus.config_manager.view import get_file_content from zeus.database.proxy.host import HostProxy @@ -173,49 +172,6 @@ class TestConfigManage(unittest.TestCase): all_fail_file_list.extend(file_content.get("fail_files")) self.assertEqual(set(expecte_fail_file), set(all_fail_file_list)) - @responses.activate - def test_get_file_content_should_return_file_content_when_all_is_right(self): - mock_agrs = { - "host_id": "xx", - "config_file_list": "xx", - "address": "xx", - "header": "xx" - } - - mock_file_info = { - 'fail_files': [], - 'infos': [], - 'success_files': ['xx'], - } - responses.add(responses.POST, - 'http://xx/v1/ceres/file/collect', - json=mock_file_info, - status=SUCCEED, - content_type='application/json' - ) - res = get_file_content(mock_agrs) - self.assertEqual(None, res.get("config_file_list"), res) - - @responses.activate - def test_get_file_content_should_return_host_id_and_config_file_list_when_response_status_code_is_not_success( - self): - mock_agrs = { - "host_id": "xx", - "config_file_list": "xx", - "address": "xx", - "header": "xx" - } - - mock_file_info = {} - responses.add(responses.POST, - 'http://xx/v1/ceres/file/collect', - json=mock_file_info, - status=SERVER_ERROR, - content_type='application/json' - ) - res = get_file_content(mock_agrs) - self.assertEqual("xx", res.get("config_file_list"), res) - @mock.patch.object(requests, "post") def test_get_file_content_should_return_host_id_and_config_file_list_when_http_connect_failed( self, mock_request): diff --git a/zeus/tests/test_host_manager/test_delete_host.py b/zeus/tests/test_host_manager/test_delete_host.py index d4df9b5..376325b 100644 --- a/zeus/tests/test_host_manager/test_delete_host.py +++ b/zeus/tests/test_host_manager/test_delete_host.py @@ -14,11 +14,10 @@ import unittest from unittest import mock from flask import Flask -import responses from zeus import BLUE_POINT from zeus.database.proxy.host import HostProxy -from vulcanus.restful.status import SUCCEED, TOKEN_ERROR, DATABASE_CONNECT_ERROR +from vulcanus.restful.status import TOKEN_ERROR, DATABASE_CONNECT_ERROR app = Flask("check") for blue, api in BLUE_POINT: @@ -37,89 +36,6 @@ header_with_token = { class TestDeleteHost(unittest.TestCase): - @responses.activate - @mock.patch.object(HostProxy, 'delete_host') - @mock.patch.object(HostProxy, 'connect') - def test_delete_host_should_return_success_list_when_input_host_id_is_in_database_and_not_in_workflow( - self, mock_mysql_connect, mock_delete_host): - input_data = {'host_list': ['test_host_id_1', 'test_host_id_2', 'test_host_id_3']} - mock_mysql_connect.return_value = True - mock_delete_host.return_value = SUCCEED, { - 'succeed_list': ['test_host_id_1', 'test_host_id_2', 'test_host_id_3'], - 'fail_list': {}, - 'host_info': {'test_host_id_1': '', 'test_host_id_2': '', 'test_host_id_3': ''} - } - mock_check_json = { - 'code': 200, - 'msg': 'xxxxx', - 'result': {host_id: False for host_id in input_data['host_list']} - } - responses.add(responses.POST, - 'http://127.0.0.1:11112/check/workflow/host/exist', - json=mock_check_json, - status=200, - content_type='application/json' - ) - resp = client.delete('/manage/host/delete', json=input_data, headers=header_with_token) - self.assertEqual(input_data['host_list'], resp.json['succeed_list']) - - @responses.activate - @mock.patch.object(HostProxy, 'delete_host') - @mock.patch.object(HostProxy, 'connect') - def test_delete_host_should_return_fail_list_when_input_host_id_is_in_database_and_workflow( - self, mock_mysql_connect, mock_delete_host): - input_data = {'host_list': ['test_host_id_1', 'test_host_id_2', 'test_host_id_3']} - mock_mysql_connect.return_value = True - mock_delete_host.return_value = SUCCEED, { - 'succeed_list': [], - 'fail_list': {}, - 'host_info': {} - } - mock_check_json = { - 'code': 200, - 'msg': 'xxxxx', - 'result': {host_id: True for host_id in input_data['host_list']} - } - responses.add(responses.POST, - 'http://127.0.0.1:11112/check/workflow/host/exist', - json=mock_check_json, - status=200, - content_type='application/json' - ) - resp = client.delete('/manage/host/delete', json=input_data, headers=header_with_token) - expect_res = dict(zip(input_data['host_list'], - len(input_data['host_list']) * ("There are workflow in check",))) - self.assertEqual(expect_res, resp.json['fail_list']) - - @responses.activate - @mock.patch.object(HostProxy, 'delete_host') - @mock.patch.object(HostProxy, 'connect') - def test_delete_host_should_return_succeed_list_and_fail_list_when_part_of_input_host_id_is_in_database_and_workflow( - self, mock_mysql_connect, mock_delete_host): - input_data = {'host_list': ['test_host_id_1', 'test_host_id_2', 'test_host_id_3']} - mock_mysql_connect.return_value = True - mock_delete_host.return_value = SUCCEED, { - 'succeed_list': ['test_host_id_2'], - 'fail_list': {}, - 'host_info': {'test_host_id_2': ''} - } - mock_check_json = { - 'code': 200, - 'msg': 'xxxxx', - 'result': {'test_host_id_1': True, 'test_host_id_2': False, 'test_host_id_3': True} - } - responses.add(responses.POST, - 'http://127.0.0.1:11112/check/workflow/host/exist', - json=mock_check_json, - status=200, - content_type='application/json' - ) - resp = client.delete('/manage/host/delete', json=input_data, headers=header_with_token) - expect_fail_list = { - 'test_host_id_1':"There are workflow in check", - 'test_host_id_3':"There are workflow in check" - } - self.assertEqual(expect_fail_list, resp.json.get('fail_list'), resp.json) def test_delete_host_should_return_token_error_when_part_of_input_with_no_token(self): input_data = {'host_list': ['test_host_id_1', 'test_host_id_2', 'test_host_id_3']} @@ -131,22 +47,6 @@ class TestDeleteHost(unittest.TestCase): resp = client.delete('/manage/host/delete', headers=header_with_token) self.assertEqual(400, resp.status_code, resp.json) - @responses.activate - @mock.patch.object(HostProxy, 'connect') - def test_delete_host_should_return_fail_list_when_aops_check_cannot_be_accessed( - self, mock_mysql_connect): - input_data = {'host_list': ['test_host_id_1', 'test_host_id_2', 'test_host_id_3']} - mock_mysql_connect.return_value = True - responses.add(responses.POST, - 'http://127.0.0.1:11112/check/workflow/host/exist', - json={'code': 500, 'msg': 'xxxxxxxx'}, - status=500, - content_type='application/json' - ) - expect_res = dict(zip(input_data['host_list'],len(input_data['host_list'])*("query workflow fail",))) - resp = client.delete('/manage/host/delete', json=input_data, headers=header_with_token) - self.assertEqual(expect_res, resp.json.get('fail_list'), resp.json) - @mock.patch.object(HostProxy, 'connect') def test_delete_host_should_return_database_error_when_database_cannot_connect( self, mock_mysql_connect): diff --git a/zeus/tests/test_host_manager/test_get_host_info.py b/zeus/tests/test_host_manager/test_get_host_info.py index bd1da21..4b6a3cf 100644 --- a/zeus/tests/test_host_manager/test_get_host_info.py +++ b/zeus/tests/test_host_manager/test_get_host_info.py @@ -15,7 +15,6 @@ import unittest from unittest import mock import requests -import responses from flask import Flask from vulcanus.conf.constant import QUERY_HOST_DETAIL @@ -145,61 +144,6 @@ class TestGetHostInfo(unittest.TestCase): headers=header_with_token) self.assertEqual(PARAM_ERROR, response.json.get('code')) - @responses.activate - def test_get_host_info_should_return_host_info_when_all_is_right(self): - mock_args = { - "host_id": "mock_host_id1", - "info_type": ["cpu", "os", "memory", "disk"], - "address": "mock_address", - "headers": { - "content-type": "application/json", - "access_token": "host token" - } - } - mock_host_info = { - "cpu": {}, - "os": {}, - "memory": {}, - "disk": [{}] - } - responses.add(responses.POST, - 'http://mock_address/v1/ceres/host/info', - json={ - "code": SUCCEED, - "msg": "mock_msg", - "resp": mock_host_info - }, - status=SUCCEED, - content_type='application/json' - ) - result = GetHostInfo.get_host_info(mock_args) - self.assertEqual({"host_id": "mock_host_id1", "host_info": mock_host_info}, result) - - @responses.activate - def test_get_host_info_should_return_host_info_is_empty_when_ceres_server_has_some_error(self): - mock_args = { - "host_id": "mock_host_id1", - "info_type": ["cpu", "os", "memory", "disk"], - "address": "mock_address", - "headers": { - "content-type": "application/json", - "access_token": "host token" - } - } - - responses.add(responses.POST, - 'http://mock_address/v1/ceres/host/info', - json={ - "code": SERVER_ERROR, - "msg": "mock_msg", - "resp": {} - }, - status=SERVER_ERROR, - content_type='application/json' - ) - result = GetHostInfo.get_host_info(mock_args) - self.assertEqual({"host_id": "mock_host_id1", "host_info": {}}, result) - @mock.patch.object(requests, "post") def test_get_host_info_should_return_host_info_is_empty_when_http_connect_error( self, mock_request): diff --git a/zeus/tests/test_vulnerability_manage/test_repo_set.py b/zeus/tests/test_vulnerability_manage/test_repo_set.py index f4a244f..b8a7ec5 100644 --- a/zeus/tests/test_vulnerability_manage/test_repo_set.py +++ b/zeus/tests/test_vulnerability_manage/test_repo_set.py @@ -14,7 +14,6 @@ import json import unittest from unittest import mock -import responses from flask import Flask from vulcanus.conf.constant import EXECUTE_REPO_SET @@ -28,7 +27,6 @@ from vulcanus.restful.status import ( ) from zeus import BLUE_POINT from zeus.account_manager.cache import UserCache, UserInfo -from zeus.conf.constant import CERES_CVE_REPO_SET from zeus.vulnerability_manage.view import ExecuteRepoSetTask app = Flask("test") @@ -221,44 +219,3 @@ class TestRepoSet(unittest.TestCase): data=json.dumps(self.MOCK_ARGS), headers=self.HEADERS_WITH_TOKEN) self.assertEqual(DATABASE_QUERY_ERROR, response.json.get('code')) - - @responses.activate - def test_execute_task_should_return_succeed_result_when_visit_succeed(self): - mock_args = { - "headers": {}, - "host_id": "mock_host_id", - "check": False, - "host_name": "moK_host_name", - "host_ip": "mock_ip", - "address": "mock_address", - "task_info": { - "check_items": [], - "task_id": "mock_task_id", - "repo_info": { - "name": "mock_name", - "dest": "mock_dest", - "repo_content": "mock_content" - }, - "callback": '', - }, - "headers_apollo": {} - } - expect_result = { - 'code': SUCCEED, - 'msg': 'operate succeed', - 'host_id': 'mock_host_id', - 'task_id': 'mock_task_id', - 'host_name': 'moK_host_name', - 'host_ip': 'mock_ip', - 'repo': 'mock_name', - 'callback': '', - "headers_apollo": {} - } - responses.add(responses.POST, - f"http://mock_address{CERES_CVE_REPO_SET}", - json={'code': SUCCEED, 'msg': 'operate succeed'}, - status=SUCCEED, - content_type='application/json' - ) - result = ExecuteRepoSetTask()._execute_task(mock_args) - self.assertEqual(expect_result, result) -- 2.37.1.windows.1