diff -Naru a/packageship/application/apps/lifecycle/function/concurrent.py b/packageship/application/apps/lifecycle/function/concurrent.py --- a/packageship/application/apps/lifecycle/function/concurrent.py 2020-09-22 23:34:04.037937224 +0800 +++ b/packageship/application/apps/lifecycle/function/concurrent.py 2020-09-22 23:48:39.938515522 +0800 @@ -1,65 +1,80 @@ -#!/usr/bin/python3 -""" -Use queues to implement the producer and consumer model -to solve the database lock introduced by high concurrency issues -""" -import threading -from queue import Queue -from sqlalchemy.exc import SQLAlchemyError -from packageship.libs.dbutils import DBHelper -from packageship.libs.exception import Error, ContentNoneException -from packageship.libs.log import Log - - -class ProducerConsumer(): - """ - The data written in the database is added to the high - concurrency queue, and the high concurrency is solved - by the form of the queue - """ - _queue = Queue(maxsize=0) - _instance_lock = threading.Lock() - _log = Log(__name__) - - def __init__(self): - self.thread_queue = threading.Thread(target=self.__queue_process) - if not self.thread_queue.isAlive(): - self.thread_queue.start() - - def start_thread(self): - """ - Judge a thread, if the thread is terminated, restart - """ - if not self.thread_queue.isAlive(): - self.thread_queue = threading.Thread(target=self.__queue_process) - self.thread_queue.start() - - def __new__(cls, *args, **kwargs): # pylint: disable=unused-argument - """ - Use the singleton pattern to create a thread-safe producer pattern - """ - if not hasattr(cls, "_instance"): - with cls._instance_lock: - if not hasattr(cls, "_instance"): - cls._instance = object.__new__(cls) - return cls._instance - - def __queue_process(self): - """ - Read the content in the queue and save and update - """ - while not self._queue.empty(): - _queue_value = self._queue.get() - try: - with DBHelper(db_name="lifecycle") as database: - database.add(_queue_value) - except (Error, ContentNoneException, SQLAlchemyError) as error: - self._log.logger.error(error) - - def put(self, pending_content): - """ - The content of the operation is added to the queue - """ - if pending_content: - self._queue.put(pending_content) - self.start_thread() +#!/usr/bin/python3 +""" +Use queues to implement the producer and consumer model +to solve the database lock introduced by high concurrency issues +""" +import threading +import time +from queue import Queue +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.exc import OperationalError +from packageship.libs.exception import Error, ContentNoneException +from packageship.libs.log import Log +from packageship.libs.configutils.readconfig import ReadConfig +from packageship import system_config + + +class ProducerConsumer(): + """ + The data written in the database is added to the high + concurrency queue, and the high concurrency is solved + by the form of the queue + """ + _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) + queue_maxsize = int(_readconfig.get_config('LIFECYCLE', 'queue_maxsize')) + if not isinstance(queue_maxsize, int): + queue_maxsize = 1000 + _queue = Queue(maxsize=queue_maxsize) + _instance_lock = threading.Lock() + _log = Log(__name__) + + def __init__(self): + self.thread_queue = threading.Thread(target=self.__queue_process) + self._instance_lock.acquire() + if not self.thread_queue.isAlive(): + self.thread_queue = threading.Thread(target=self.__queue_process) + self.thread_queue.start() + self._instance_lock.release() + + def start_thread(self): + """ + Judge a thread, if the thread is terminated, restart + """ + self._instance_lock.acquire() + if not self.thread_queue.isAlive(): + self.thread_queue = threading.Thread(target=self.__queue_process) + self.thread_queue.start() + self._instance_lock.release() + + def __new__(cls, *args, **kwargs): # pylint: disable=unused-argument + """ + Use the singleton pattern to create a thread-safe producer pattern + """ + if not hasattr(cls, "_instance"): + with cls._instance_lock: + if not hasattr(cls, "_instance"): + cls._instance = object.__new__(cls) + return cls._instance + + def __queue_process(self): + """ + Read the content in the queue and save and update + """ + while not self._queue.empty(): + _queue_value, method = self._queue.get() + try: + method(_queue_value) + except OperationalError as error: + self._log.logger.warning(error) + time.sleep(0.2) + self._queue.put((_queue_value, method)) + except (Error, ContentNoneException, SQLAlchemyError) as error: + self._log.logger.error(error) + + def put(self, pending_content): + """ + The content of the operation is added to the queue + """ + if pending_content: + self._queue.put(pending_content) + self.start_thread() diff -Naru a/packageship/application/apps/lifecycle/function/download_yaml.py b/packageship/application/apps/lifecycle/function/download_yaml.py --- a/packageship/application/apps/lifecycle/function/download_yaml.py 2020-09-22 23:34:04.037937224 +0800 +++ b/packageship/application/apps/lifecycle/function/download_yaml.py 2020-09-22 23:48:46.478549707 +0800 @@ -1,222 +1,224 @@ -#!/usr/bin/python3 -""" -Dynamically obtain the content of the yaml file \ -that saves the package information, periodically \ -obtain the content and save it in the database -""" -import copy -from concurrent.futures import ThreadPoolExecutor -import datetime as date -import requests -import yaml -from retrying import retry -from sqlalchemy.exc import SQLAlchemyError -from requests.exceptions import HTTPError -from packageship import system_config -from packageship.application.models.package import Packages -from packageship.application.models.package import PackagesMaintainer -from packageship.libs.dbutils import DBHelper -from packageship.libs.exception import Error, ContentNoneException -from packageship.libs.configutils.readconfig import ReadConfig -from .base import Base -from .gitee import Gitee -from .concurrent import ProducerConsumer - - -class ParseYaml(): - """ - Description: Analyze the downloaded remote yaml file, obtain the tags - and maintainer information in the yaml file, and save the obtained - relevant information into the database - - Attributes: - base: base class instance - pkg: Specific package data - _table_name: The name of the data table to be operated - openeuler_advisor_url: Get the warehouse address of the yaml file - _yaml_content: The content of the yaml file - """ - - def __init__(self, pkg_info, base, table_name): - self.base = base - self.pkg = pkg_info - self._table_name = table_name - self.openeuler_advisor_url = self._path_stitching(pkg_info.name) - self._yaml_content = None - self.timed_task_open = self._timed_task_status() - self.producer_consumer = ProducerConsumer() - - def _timed_task_status(self): - """ - The open state of information such as the maintainer in the scheduled task - """ - _timed_task_status = True - _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) - open_status = _readconfig.get_config('TIMEDTASK', 'open') - if open_status not in ('True', 'False'): - self.base.log.logger.error( - 'Wrong setting of the open state value of the scheduled task') - if open_status == 'False': - self.timed_task_open = False - return _timed_task_status - - def _path_stitching(self, pkg_name): - """ - The path of the remote service call - """ - _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) - _remote_url = _readconfig.get_config('LIFECYCLE', 'warehouse_remote') - if _remote_url is None: - _remote_url = 'https://gitee.com/openeuler/openEuler-Advisor/raw/master/upstream-info/' - return _remote_url + '{pkg_name}.yaml'.format(pkg_name=pkg_name) - - def update_database(self): - """ - For the current package, determine whether the specific yaml file exists, parse - the data in it and save it in the database if it exists, and record the relevant - log if it does not exist - - """ - if self._openeuler_advisor_exists_yaml(): - self._save_to_database() - else: - msg = "The yaml information of the [%s] package has not been" \ - "obtained yet" % self.pkg.name - self.base.log.logger.warning(msg) - - def _get_yaml_content(self, url): - """ - - """ - try: - response = requests.get( - url, headers=self.base.headers) - if response.status_code == 200: - self._yaml_content = yaml.safe_load(response.content) - - except HTTPError as error: - self.base.log.logger.error(error) - - def _openeuler_advisor_exists_yaml(self): - """ - Determine whether there is a yaml file with the current \ - package name under the openeuler-advisor project - - """ - self._get_yaml_content(self.openeuler_advisor_url) - if self._yaml_content: - return True - return False - - def _save_to_database(self): - """ - Save the acquired yaml file information to the database - - Raises: - ContentNoneException: The added entity content is empty - Error: An error occurred during data addition - """ - self._parse_warehouse_info() - tags = self._yaml_content.get('git_tag', None) - if tags: - self._parse_tags_content(tags) - self.producer_consumer.put(copy.deepcopy(self.pkg)) - if self.timed_task_open: - _maintainer = self._yaml_content.get('maintainers') - if _maintainer and isinstance(_maintainer, list): - self.pkg.maintainer = _maintainer[0] - self.pkg.maintainlevel = self._yaml_content.get('maintainlevel') - try: - if self.timed_task_open: - @retry(stop_max_attempt_number=3, stop_max_delay=500) - def _save_maintainer_info(): - with DBHelper(db_name="lifecycle") as database: - _packages_maintainer = database.session.query( - PackagesMaintainer).filter( - PackagesMaintainer.name == self.pkg.name).first() - if _packages_maintainer: - _packages_maintainer.name = self.pkg.name - _packages_maintainer.maintainer = self.pkg.maintainer - _packages_maintainer.maintainlevel = self.pkg.maintainlevel - else: - _packages_maintainer = PackagesMaintainer( - name=self.pkg.name, maintainer=self.pkg.maintainer, - maintainlevel=self.pkg.maintainlevel) - self.producer_consumer.put( - copy.deepcopy(_packages_maintainer)) - _save_maintainer_info() - except (Error, ContentNoneException, SQLAlchemyError) as error: - self.base.log.logger.error(error) - - def _parse_warehouse_info(self): - """ - Parse the warehouse information in the yaml file - - """ - if self._yaml_content: - self.pkg.version_control = self._yaml_content.get( - 'version_control') - self.pkg.src_repo = self._yaml_content.get('src_repo') - self.pkg.tag_prefix = self._yaml_content.get('tag_prefix') - - def _parse_tags_content(self, tags): - """ - Parse the obtained tags content - - """ - try: - # Integrate tags information into key-value pairs - _tags = [(tag.split()[0], tag.split()[1]) for tag in tags] - _tags = sorted(_tags, key=lambda x: x[0], reverse=True) - self.pkg.latest_version = _tags[0][1] - self.pkg.latest_version_time = _tags[0][0] - _end_time = date.datetime.strptime( - self.pkg.latest_version_time, '%Y-%m-%d') - if self.pkg.latest_version != self.pkg.version: - for _version in _tags: - if _version[1] == self.pkg.version: - _end_time = date.datetime.strptime( - _version[0], '%Y-%m-%d') - self.pkg.used_time = (date.datetime.now() - _end_time).days - - except (IndexError, Error) as index_error: - self.base.log.logger.error(index_error) - - -def update_pkg_info(pkg_info_update=True): - """ - Update the information of the upstream warehouse in the source package - - """ - try: - base_control = Base() - _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) - pool_workers = _readconfig.get_config('LIFECYCLE', 'pool_workers') - _warehouse = _readconfig.get_config('LIFECYCLE', 'warehouse') - if _warehouse is None: - _warehouse = 'src-openeuler' - if not isinstance(pool_workers, int): - pool_workers = 10 - # Open thread pool - pool = ThreadPoolExecutor(max_workers=pool_workers) - with DBHelper(db_name="lifecycle") as database: - for table_name in filter(lambda x: x not in ['packages_issue', 'packages_maintainer'], - database.engine.table_names()): - - cls_model = Packages.package_meta(table_name) - # Query a specific table - for package_item in database.session.query(cls_model).all(): - if pkg_info_update: - parse_yaml = ParseYaml( - pkg_info=copy.deepcopy(package_item), - base=base_control, - table_name=table_name) - pool.submit(parse_yaml.update_database) - else: - # Get the issue of each warehouse and save it - gitee_issue = Gitee( - package_item, _warehouse, package_item.name, table_name) - pool.submit(gitee_issue.query_issues_info) - pool.shutdown() - except SQLAlchemyError as error_msg: - base_control.log.logger.error(error_msg) +#!/usr/bin/python3 +""" +Dynamically obtain the content of the yaml file \ +that saves the package information, periodically \ +obtain the content and save it in the database +""" +import copy +from concurrent.futures import ThreadPoolExecutor +import datetime as date +import requests +import yaml +from retrying import retry +from sqlalchemy.exc import SQLAlchemyError +from requests.exceptions import HTTPError +from packageship import system_config +from packageship.application.models.package import Packages +from packageship.application.models.package import PackagesMaintainer +from packageship.libs.dbutils import DBHelper +from packageship.libs.exception import Error, ContentNoneException +from packageship.libs.configutils.readconfig import ReadConfig +from .base import Base +from .gitee import Gitee +from .concurrent import ProducerConsumer + + +class ParseYaml(): + """ + Description: Analyze the downloaded remote yaml file, obtain the tags + and maintainer information in the yaml file, and save the obtained + relevant information into the database + + Attributes: + base: base class instance + pkg: Specific package data + _table_name: The name of the data table to be operated + openeuler_advisor_url: Get the warehouse address of the yaml file + _yaml_content: The content of the yaml file + """ + + def __init__(self, pkg_info, base, table_name): + self.base = base + self.pkg = pkg_info + self._table_name = table_name + self.openeuler_advisor_url = self._path_stitching(pkg_info.name) + self._yaml_content = None + self.timed_task_open = self._timed_task_status() + self.producer_consumer = ProducerConsumer() + + def _timed_task_status(self): + """ + The open state of information such as the maintainer in the scheduled task + """ + _timed_task_status = True + _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) + open_status = _readconfig.get_config('TIMEDTASK', 'open') + if open_status not in ('True', 'False'): + self.base.log.logger.error( + 'Wrong setting of the open state value of the scheduled task') + if open_status == 'False': + self.timed_task_open = False + return _timed_task_status + + def _path_stitching(self, pkg_name): + """ + The path of the remote service call + """ + _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) + _remote_url = _readconfig.get_config('LIFECYCLE', 'warehouse_remote') + if _remote_url is None: + _remote_url = 'https://gitee.com/openeuler/openEuler-Advisor/raw/master/upstream-info/' + return _remote_url + '{pkg_name}.yaml'.format(pkg_name=pkg_name) + + def update_database(self): + """ + For the current package, determine whether the specific yaml file exists, parse + the data in it and save it in the database if it exists, and record the relevant + log if it does not exist + + """ + if self._openeuler_advisor_exists_yaml(): + self._save_to_database() + else: + msg = "The yaml information of the [%s] package has not been" \ + "obtained yet" % self.pkg.name + self.base.log.logger.warning(msg) + + def _get_yaml_content(self, url): + """ + + """ + try: + response = requests.get( + url, headers=self.base.headers) + if response.status_code == 200: + self._yaml_content = yaml.safe_load(response.content) + + except HTTPError as error: + self.base.log.logger.error(error) + + def _openeuler_advisor_exists_yaml(self): + """ + Determine whether there is a yaml file with the current \ + package name under the openeuler-advisor project + + """ + self._get_yaml_content(self.openeuler_advisor_url) + if self._yaml_content: + return True + return False + + def _save_to_database(self): + """ + Save the acquired yaml file information to the database + + Raises: + ContentNoneException: The added entity content is empty + Error: An error occurred during data addition + """ + + def _save_package(package_module): + with DBHelper(db_name="lifecycle") as database: + database.add(package_module) + + def _save_maintainer_info(maintainer_module): + with DBHelper(db_name="lifecycle") as database: + _packages_maintainer = database.session.query( + PackagesMaintainer).filter( + PackagesMaintainer.name == maintainer_module['name']).first() + if _packages_maintainer: + for key, val in maintainer_module.items(): + setattr(_packages_maintainer, key, val) + else: + _packages_maintainer = PackagesMaintainer( + **maintainer_module) + database.add(_packages_maintainer) + + self._parse_warehouse_info() + tags = self._yaml_content.get('git_tag', None) + if tags: + self._parse_tags_content(tags) + self.producer_consumer.put( + (copy.deepcopy(self.pkg), _save_package)) + if self.timed_task_open: + maintainer = {'name': self.pkg.name} + _maintainer = self._yaml_content.get('maintainers') + if _maintainer and isinstance(_maintainer, list): + maintainer['maintainer'] = _maintainer[0] + maintainer['maintainlevel'] = self._yaml_content.get( + 'maintainlevel') + + self.producer_consumer.put((maintainer, _save_maintainer_info)) + + def _parse_warehouse_info(self): + """ + Parse the warehouse information in the yaml file + + """ + if self._yaml_content: + self.pkg.version_control = self._yaml_content.get( + 'version_control') + self.pkg.src_repo = self._yaml_content.get('src_repo') + self.pkg.tag_prefix = self._yaml_content.get('tag_prefix') + + def _parse_tags_content(self, tags): + """ + Parse the obtained tags content + + """ + try: + # Integrate tags information into key-value pairs + _tags = [(tag.split()[0], tag.split()[1]) for tag in tags] + _tags = sorted(_tags, key=lambda x: x[0], reverse=True) + self.pkg.latest_version = _tags[0][1] + self.pkg.latest_version_time = _tags[0][0] + _end_time = date.datetime.strptime( + self.pkg.latest_version_time, '%Y-%m-%d') + if self.pkg.latest_version != self.pkg.version: + for _version in _tags: + if _version[1] == self.pkg.version: + _end_time = date.datetime.strptime( + _version[0], '%Y-%m-%d') + self.pkg.used_time = (date.datetime.now() - _end_time).days + + except (IndexError, Error) as index_error: + self.base.log.logger.error(index_error) + + +def update_pkg_info(pkg_info_update=True): + """ + Update the information of the upstream warehouse in the source package + + """ + try: + base_control = Base() + _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) + pool_workers = _readconfig.get_config('LIFECYCLE', 'pool_workers') + _warehouse = _readconfig.get_config('LIFECYCLE', 'warehouse') + if _warehouse is None: + _warehouse = 'src-openeuler' + if not isinstance(pool_workers, int): + pool_workers = 10 + # Open thread pool + pool = ThreadPoolExecutor(max_workers=pool_workers) + with DBHelper(db_name="lifecycle") as database: + for table_name in filter(lambda x: x not in ['packages_issue', 'packages_maintainer', 'database_info'], + database.engine.table_names()): + + cls_model = Packages.package_meta(table_name) + # Query a specific table + for package_item in database.session.query(cls_model).all(): + if pkg_info_update: + parse_yaml = ParseYaml( + pkg_info=copy.deepcopy(package_item), + base=base_control, + table_name=table_name) + pool.submit(parse_yaml.update_database) + else: + # Get the issue of each warehouse and save it + gitee_issue = Gitee( + copy.deepcopy(package_item), _warehouse, package_item.name, table_name) + pool.submit(gitee_issue.query_issues_info) + pool.shutdown() + except SQLAlchemyError as error_msg: + base_control.log.logger.error(error_msg) diff -Naru a/packageship/application/apps/lifecycle/function/gitee.py b/packageship/application/apps/lifecycle/function/gitee.py --- a/packageship/application/apps/lifecycle/function/gitee.py 2020-09-22 23:34:04.037937224 +0800 +++ b/packageship/application/apps/lifecycle/function/gitee.py 2020-09-22 23:48:52.698582219 +0800 @@ -1,224 +1,223 @@ -#!/usr/bin/python3 -""" -Description:Get issue info from gitee -Class: Gitee -""" -import copy -from json import JSONDecodeError -from retrying import retry -import requests -from requests.exceptions import HTTPError -from sqlalchemy.exc import SQLAlchemyError -from packageship.libs.dbutils import DBHelper -from packageship.libs.configutils.readconfig import ReadConfig -from packageship.libs.exception import Error, ContentNoneException -from packageship.application.models.package import PackagesIssue -from packageship import system_config -from packageship.libs.log import Log -from .concurrent import ProducerConsumer - -LOGGER = Log(__name__) - - -class Gitee(): - """ - gitee version management tool related information acquisition - - """ - - def __init__(self, pkg_info, owner, repo, table_name): - self.pkg_info = pkg_info - self.owner = owner - self.repo = repo - self._read_config = ReadConfig(system_config.SYS_CONFIG_PATH) - self.url = "https://gitee.com/" - self.api_url = "https://gitee.com/api/v5/repos" - self.pool = None - self.issue_id = None - self.defect = 0 - self.feature = 0 - self.cve = 0 - self.patch_files_path = self._read_config.get_system( - "patch_files_path") - self.table_name = table_name - self.producer_consumer = ProducerConsumer() - - def query_issues_info(self, issue_id=""): - """ - Description: View the issue details of the specified package - Args: - issue_id: Issue id - Returns: - issue_content_list: The issue details of the specified package list - Raises: - - """ - issue_url = self.api_url + \ - "/{}/{}/issues/{}".format(self.owner, self.repo, issue_id) - try: - response = requests.get( - issue_url, params={"state": "all", "per_page": 100}) - except Error as error: - LOGGER.logger.error(error) - return None - if response.status_code != 200: - return None - total_page = 1 if issue_id else int(response.headers['total_page']) - total_count = int(response.headers['total_count']) - if total_count > 0: - issue_list = self._query_per_page_issue_info(total_page, issue_url) - if not issue_list: - LOGGER.logger.error( - "An error occurred while querying {}".format(self.repo)) - return None - self._save_issues(issue_list) - - def _query_per_page_issue_info(self, total_page, issue_url): - """ - Description: View the issue details - Args: - total_page: total page - issue_url: issue url - - Returns: - - """ - issue_content_list = [] - for i in range(1, total_page + 1): - - @retry(stop_max_attempt_number=3, stop_max_delay=1000) - def request_issue(page, issue_url): - try: - response = requests.get(issue_url, - params={"state": "all", "per_page": 100, "page": page}) - except HTTPError: - raise HTTPError('Network request error') - return response - - try: - response = request_issue(i, issue_url) - if response.status_code != 200: - LOGGER.logger.warning(response.content.decode("utf-8")) - continue - issue_content_list.extend( - self.parse_issues_content(response.json())) - except (JSONDecodeError, Error) as error: - LOGGER.logger.error(error) - return issue_content_list - - def _save_issues(self, issue_list): - """ - Save the obtained issue information - - """ - try: - issue_ids = [issue['issue_id'] for issue in issue_list] - with DBHelper(db_name="lifecycle") as database: - - @retry(stop_max_attempt_number=3, stop_max_delay=500) - def _query_pkgissues(): - exist_issues = database.session.query(PackagesIssue).filter( - PackagesIssue.issue_id.in_(issue_ids)).all() # pylint: disable=protected-access - return exist_issues - - exist_issues = _query_pkgissues() - # Save the issue - for issue_item in issue_list: - issue_model = [ - issue for issue in exist_issues if issue.issue_id == issue_item['issue_id']] - if issue_model: - for key, val in issue_item.items(): - setattr(issue_model[0], key, val) - self.producer_consumer.put( - copy.deepcopy(issue_model[0])) - else: - self.producer_consumer.put( - PackagesIssue(**issue_item)) - - # The number of various issues in the update package - self.pkg_info.defect = self.defect - self.pkg_info.feature = self.feature - self.pkg_info.cve = self.cve - self.producer_consumer.put(copy.deepcopy(self.pkg_info)) - - except (Error, ContentNoneException, SQLAlchemyError) as error: - LOGGER.logger.error( - 'An abnormal error occurred while saving related issues:%s' % error if error else '') - - def parse_issues_content(self, sources): - """ - Description: Parse the response content and get issue content - Args:Issue list - - Returns:list:issue_id, issue_url, issue_content, issue_status, issue_download - Raises: - """ - result_list = [] - if isinstance(sources, list): - for source in sources: - issue_content = self.parse_issue_content(source) - if issue_content: - result_list.append(issue_content) - else: - issue_content = self.parse_issue_content(sources) - if issue_content: - result_list.append(issue_content) - return result_list - - def parse_issue_content(self, source): - """ - Description: Parse the response content and get issue content - Args: Source of issue content - - Returns:list:issue_id, issue_url, issue_content, issue_status, issue_download, issue_status - issue_type, related_release - Raises:KeyError - """ - try: - result_dict = {"issue_id": source['number'], "issue_url": source['html_url'], - "issue_title": source['title'].strip(), - "issue_content": source['body'].strip(), - "issue_status": source['state'], "issue_download": "", - "issue_type": source["issue_type"], - "pkg_name": self.repo, - "related_release": source["labels"][0]['name'] if source["labels"] else ''} - if source["issue_type"] == "缺陷": - self.defect += 1 - elif source["issue_type"] == "需求": - self.feature += 1 - elif source["issue_type"] == "CVE和安全问题": - self.cve += 1 - else: - pass - except KeyError as error: - LOGGER.logger.error(error) - return None - return result_dict - - def issue_hooks(self, issue_hook_info): - """ - Description: Hook data triggered by a new task operation - Args: - issue_hook_info: Issue info - Returns: - - Raises: - - """ - if issue_hook_info is None: - raise ContentNoneException( - 'The content cannot be empty') - issue_info_list = [] - issue_info = issue_hook_info["issue"] - issue_content = self.parse_issue_content(issue_info) - if issue_content: - issue_info_list.append(issue_content) - if self.feature != 0: - self.defect, self.feature, self.cve = self.pkg_info.defect, self.pkg_info.feature + \ - 1, self.pkg_info.cve - if self.defect != 0: - self.defect, self.feature, self.cve = self.pkg_info.defect + \ - 1, self.pkg_info.feature, self.pkg_info.cve - if self.cve != 0: - self.defect, self.feature, self.cve = self.pkg_info.defect, self.pkg_info.feature, self.pkg_info.cve + 1 - self._save_issues(issue_info_list) +#!/usr/bin/python3 +""" +Description:Get issue info from gitee +Class: Gitee +""" +import copy +from json import JSONDecodeError +from retrying import retry +import requests +from requests.exceptions import HTTPError +from sqlalchemy.exc import SQLAlchemyError +from packageship.libs.dbutils import DBHelper +from packageship.libs.configutils.readconfig import ReadConfig +from packageship.libs.exception import Error, ContentNoneException +from packageship.application.models.package import PackagesIssue +from packageship import system_config +from packageship.libs.log import Log +from .concurrent import ProducerConsumer + +LOGGER = Log(__name__) + + +class Gitee(): + """ + gitee version management tool related information acquisition + + """ + + def __init__(self, pkg_info, owner, repo, table_name): + self.pkg_info = pkg_info + self.owner = owner + self.repo = repo + self._read_config = ReadConfig(system_config.SYS_CONFIG_PATH) + self.url = "https://gitee.com/" + self.api_url = "https://gitee.com/api/v5/repos" + self.pool = None + self.issue_id = None + self.defect = 0 + self.feature = 0 + self.cve = 0 + self.patch_files_path = self._read_config.get_system( + "patch_files_path") + self.table_name = table_name + self.producer_consumer = ProducerConsumer() + + def query_issues_info(self, issue_id=""): + """ + Description: View the issue details of the specified package + Args: + issue_id: Issue id + Returns: + issue_content_list: The issue details of the specified package list + Raises: + + """ + issue_url = self.api_url + \ + "/{}/{}/issues/{}".format(self.owner, self.repo, issue_id) + try: + response = requests.get( + issue_url, params={"state": "all", "per_page": 100}) + except Error as error: + LOGGER.logger.error(error) + return None + if response.status_code != 200: + return None + total_page = 1 if issue_id else int(response.headers['total_page']) + total_count = int(response.headers['total_count']) + if total_count > 0: + issue_list = self._query_per_page_issue_info(total_page, issue_url) + if not issue_list: + LOGGER.logger.error( + "An error occurred while querying {}".format(self.repo)) + return None + self._save_issues(issue_list) + + def _query_per_page_issue_info(self, total_page, issue_url): + """ + Description: View the issue details + Args: + total_page: total page + issue_url: issue url + + Returns: + + """ + issue_content_list = [] + for i in range(1, total_page + 1): + + @retry(stop_max_attempt_number=3, stop_max_delay=1000) + def request_issue(page, issue_url): + try: + response = requests.get(issue_url, + params={"state": "all", "per_page": 100, "page": page}) + except HTTPError: + raise HTTPError('Network request error') + return response + + try: + response = request_issue(i, issue_url) + if response.status_code != 200: + LOGGER.logger.warning(response.content.decode("utf-8")) + continue + issue_content_list.extend( + self.parse_issues_content(response.json())) + except (JSONDecodeError, Error) as error: + LOGGER.logger.error(error) + return issue_content_list + + def _save_issues(self, issue_list): + """ + Save the obtained issue information + + """ + try: + def _save(issue_module): + with DBHelper(db_name='lifecycle') as database: + + exist_issues = database.session.query(PackagesIssue).filter( + PackagesIssue.issue_id == issue_module['issue_id']).first() + if exist_issues: + + # Save the issue + for key, val in issue_module.items(): + setattr(exist_issues, key, val) + else: + exist_issues = PackagesIssue(**issue_module) + database.add(exist_issues) + + def _save_package(package_module): + with DBHelper(db_name='lifecycle') as database: + database.add(package_module) + + for issue_item in issue_list: + self.producer_consumer.put( + (copy.deepcopy(issue_item), _save)) + + # The number of various issues in the update package + self.pkg_info.defect = self.defect + self.pkg_info.feature = self.feature + self.pkg_info.cve = self.cve + self.producer_consumer.put((copy.deepcopy(self.pkg_info), _save_package)) + + except (Error, ContentNoneException, SQLAlchemyError) as error: + LOGGER.logger.error( + 'An abnormal error occurred while saving related issues:%s' % error if error else '') + + def parse_issues_content(self, sources): + """ + Description: Parse the response content and get issue content + Args:Issue list + + Returns:list:issue_id, issue_url, issue_content, issue_status, issue_download + Raises: + """ + result_list = [] + if isinstance(sources, list): + for source in sources: + issue_content = self.parse_issue_content(source) + if issue_content: + result_list.append(issue_content) + else: + issue_content = self.parse_issue_content(sources) + if issue_content: + result_list.append(issue_content) + return result_list + + def parse_issue_content(self, source): + """ + Description: Parse the response content and get issue content + Args: Source of issue content + + Returns:list:issue_id, issue_url, issue_content, issue_status, issue_download, issue_status + issue_type, related_release + Raises:KeyError + """ + try: + result_dict = {"issue_id": source['number'], "issue_url": source['html_url'], + "issue_title": source['title'].strip(), + "issue_content": source['body'].strip(), + "issue_status": source['state'], "issue_download": "", + "issue_type": source["issue_type"], + "pkg_name": self.repo, + "related_release": source["labels"][0]['name'] if source["labels"] else ''} + if source["issue_type"] == "缺陷": + self.defect += 1 + elif source["issue_type"] == "需求": + self.feature += 1 + elif source["issue_type"] == "CVE和安全问题": + self.cve += 1 + else: + pass + except KeyError as error: + LOGGER.logger.error(error) + return None + return result_dict + + def issue_hooks(self, issue_hook_info): + """ + Description: Hook data triggered by a new task operation + Args: + issue_hook_info: Issue info + Returns: + + Raises: + + """ + if issue_hook_info is None: + raise ContentNoneException( + 'The content cannot be empty') + issue_info_list = [] + issue_info = issue_hook_info["issue"] + issue_content = self.parse_issue_content(issue_info) + if issue_content: + issue_info_list.append(issue_content) + if self.feature != 0: + self.defect, self.feature, self.cve = self.pkg_info.defect, self.pkg_info.feature + \ + 1, self.pkg_info.cve + if self.defect != 0: + self.defect, self.feature, self.cve = self.pkg_info.defect + \ + 1, self.pkg_info.feature, self.pkg_info.cve + if self.cve != 0: + self.defect, self.feature, self.cve = self.pkg_info.defect, self.pkg_info.feature, self.pkg_info.cve + 1 + self._save_issues(issue_info_list) diff -Naru a/packageship/application/apps/lifecycle/view.py b/packageship/application/apps/lifecycle/view.py --- a/packageship/application/apps/lifecycle/view.py 2020-09-22 23:34:04.037937224 +0800 +++ b/packageship/application/apps/lifecycle/view.py 2020-09-22 23:52:49.731821183 +0800 @@ -1,760 +1,760 @@ -#!/usr/bin/python3 -""" -Life cycle related api interface -""" -import io -import json -import math -import os -from concurrent.futures import ThreadPoolExecutor - -import pandas as pd -import yaml - -from flask import request -from flask import jsonify, make_response -from flask import current_app -from flask_restful import Resource -from marshmallow import ValidationError - -from sqlalchemy.exc import DisconnectionError, SQLAlchemyError - -from packageship import system_config -from packageship.libs.configutils.readconfig import ReadConfig -from packageship.libs.exception import Error -from packageship.application.apps.package.function.constants import ResponseCode -from packageship.libs.dbutils.sqlalchemy_helper import DBHelper -from packageship.application.models.package import PackagesIssue -from packageship.application.models.package import Packages -from packageship.application.models.package import PackagesMaintainer -from packageship.libs.log import Log -from .serialize import IssueDownloadSchema, PackagesDownloadSchema, IssuePageSchema, IssueSchema -from ..package.serialize import DataFormatVerfi, UpdatePackagesSchema -from .function.gitee import Gitee as gitee - -LOGGER = Log(__name__) - - -# pylint: disable = no-self-use - -class DownloadFile(Resource): - """ - Download the content of the issue or the excel file of the package content - """ - - def _download_excel(self, file_type, table_name=None): - """ - Download excel file - """ - file_name = 'packages.xlsx' - if file_type == 'packages': - download_content = self.__get_packages_content(table_name) - else: - file_name = 'issues.xlsx' - download_content = self.__get_issues_content() - if download_content is None: - return jsonify( - ResponseCode.response_json( - ResponseCode.SERVICE_ERROR)) - pd_dataframe = self.__to_dataframe(download_content) - - _response = self.__bytes_save(pd_dataframe) - return self.__set_response_header(_response, file_name) - - def __bytes_save(self, data_frame): - """ - Save the file content in the form of a binary file stream - """ - try: - bytes_io = io.BytesIO() - writer = pd.ExcelWriter( # pylint: disable=abstract-class-instantiated - bytes_io, engine='xlsxwriter') - data_frame.to_excel(writer, sheet_name='Summary', index=False) - writer.save() - writer.close() - bytes_io.seek(0) - _response = make_response(bytes_io.getvalue()) - bytes_io.close() - return _response - except (IOError, Error) as io_error: - current_app.logger.error(io_error) - return make_response() - - def __set_response_header(self, response, file_name): - """ - Set http response header information - """ - response.headers['Content-Type'] = \ - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" - response.headers["Cache-Control"] = "no-cache" - response.headers['Content-Disposition'] = 'attachment; filename={file_name}'.format( - file_name=file_name) - return response - - def __get_packages_content(self, table_name): - """ - Get package list information - """ - try: - with DBHelper(db_name='lifecycle') as database: - # Query all package data in the specified table - _model = Packages.package_meta(table_name) - _packageinfos = database.session.query(_model).all() - packages_dicts = PackagesDownloadSchema( - many=True).dump(_packageinfos) - return packages_dicts - - except (SQLAlchemyError, DisconnectionError) as error: - current_app.logger.error(error) - return None - - def __get_issues_content(self): - """ - Get the list of issues - """ - try: - with DBHelper(db_name='lifecycle') as database: - _issues = database.session.query(PackagesIssue).all() - issues_dicts = IssueDownloadSchema(many=True).dump(_issues) - return issues_dicts - except (SQLAlchemyError, DisconnectionError) as error: - current_app.logger.error(error) - return None - - def __to_dataframe(self, datas): - """ - Convert the obtained information into pandas content format - """ - - data_frame = pd.DataFrame(datas) - return data_frame - - def get(self, file_type): - """ - Download package collection information and isse list information - - """ - if file_type not in ['packages', 'issues']: - return jsonify( - ResponseCode.response_json( - ResponseCode.PARAM_ERROR)) - - table_name = request.args.get('table_name', None) - response = self._download_excel(file_type, table_name) - return response - - -class MaintainerView(Resource): - """ - Maintainer name collection - """ - - def __query_maintainers(self): - """ - Query the names of all maintainers in the specified table - """ - try: - with DBHelper(db_name='lifecycle') as database: - maintainers = database.session.query( - PackagesMaintainer.maintainer).group_by(PackagesMaintainer.maintainer).all() - return [maintainer_item[0] for maintainer_item in maintainers - if maintainer_item[0]] - except (SQLAlchemyError, DisconnectionError) as error: - current_app.logger.error(error) - return [] - - def get(self): - """ - Get the list of maintainers - """ - # Group query of the names of all maintainers in the current table - maintainers = self.__query_maintainers() - return jsonify(ResponseCode.response_json( - ResponseCode.SUCCESS, - maintainers)) - - -class TableColView(Resource): - """ - The default column of the package shows the interface - """ - - def __columns_names(self): - """ - Mapping of column name and title - """ - columns = [ - ('name', 'Name', True), - ('version', 'Version', True), - ('release', 'Release', True), - ('url', 'Url', True), - ('rpm_license', 'License', False), - ('feature', 'Feature', False), - ('maintainer', 'Maintainer', True), - ('maintainlevel', 'Maintenance Level', True), - ('release_time', 'Release Time', False), - ('used_time', 'Used Time', True), - ('maintainer_status', 'Maintain Status', True), - ('latest_version', 'Latest Version', False), - ('latest_version_time', 'Latest Version Release Time', False), - ('issue', 'Issue', True)] - return columns - - def __columns_mapping(self): - """ - - """ - columns = list() - for column in self.__columns_names(): - columns.append({ - 'column_name': column[0], - 'label': column[1], - 'default_selected': column[2] - }) - return columns - - def get(self): - """ - Get the default display column of the package - - """ - table_mapping_columns = self.__columns_mapping() - return jsonify( - ResponseCode.response_json( - ResponseCode.SUCCESS, - table_mapping_columns)) - - -class LifeTables(Resource): - """ - description: LifeTables - Restful API: get - ChangeLog: - """ - - def get(self): - """ - return all table names in the database - - Returns: - Return the table names in the database as a list - """ - try: - with DBHelper(db_name="lifecycle") as database_name: - # View all table names in the package-info database - all_table_names = database_name.engine.table_names() - all_table_names.remove("packages_issue") - all_table_names.remove("packages_maintainer") - return jsonify( - ResponseCode.response_json( - ResponseCode.SUCCESS, data=all_table_names) - ) - except (SQLAlchemyError, DisconnectionError, Error, ValueError) as sql_error: - LOGGER.logger.error(sql_error) - return jsonify( - ResponseCode.response_json(ResponseCode.DATABASE_NOT_FOUND) - ) - - -class IssueView(Resource): - """ - Issue content collection - """ - - def _query_issues(self, request_data): - """ - Args: - request_data: - Returns: - """ - try: - with DBHelper(db_name='lifecycle') as database: - issues_query = database.session.query(PackagesIssue.issue_id, - PackagesIssue.issue_url, - PackagesIssue.issue_title, - PackagesIssue.issue_status, - PackagesIssue.pkg_name, - PackagesIssue.issue_type, - PackagesMaintainer.maintainer). \ - outerjoin(PackagesMaintainer, - PackagesMaintainer.name == PackagesIssue.pkg_name) - if request_data.get("pkg_name"): - issues_query = issues_query.filter( - PackagesIssue.pkg_name == request_data.get("pkg_name")) - if request_data.get("issue_type"): - issues_query = issues_query.filter( - PackagesIssue.issue_type == request_data.get("issue_type")) - if request_data.get("issue_status"): - issues_query = issues_query.filter( - PackagesIssue.issue_status == request_data.get("issue_status")) - if request_data.get("maintainer"): - issues_query = issues_query.filter( - PackagesMaintainer.maintainer == request_data.get("maintainer")) - total_count = issues_query.count() - total_page = math.ceil( - total_count / int(request_data.get("page_size"))) - issues_query = issues_query.limit(request_data.get("page_size")).offset( - (int(request_data.get("page_num")) - 1) * int(request_data.get("page_size"))) - issue_dicts = IssuePageSchema( - many=True).dump(issues_query.all()) - issue_data = ResponseCode.response_json( - ResponseCode.SUCCESS, issue_dicts) - issue_data['total_count'] = total_count - issue_data['total_page'] = total_page - return issue_data - except (SQLAlchemyError, DisconnectionError) as error: - current_app.logger.error(error) - return ResponseCode.response_json(ResponseCode.DATABASE_NOT_FOUND) - - def get(self): - """ - Description: Get all issues info or one specific issue - Args: - Returns: - [ - { - "issue_id": "", - "issue_url": "", - "issue_title": "", - "issue_content": "", - "issue_status": "", - "issue_type": "" - }, - ] - Raises: - DisconnectionError: Unable to connect to database exception - AttributeError: Object does not have this property - TypeError: Exception of type - Error: Abnormal error - """ - schema = IssueSchema() - if schema.validate(request.args): - return jsonify( - ResponseCode.response_json(ResponseCode.PARAM_ERROR) - ) - issue_dict = self._query_issues(request.args) - return issue_dict - - -class IssueType(Resource): - """ - Issue type collection - """ - - def _get_issue_type(self): - """ - Description: Query issue type - Returns: - """ - try: - with DBHelper(db_name='lifecycle') as database: - issues_query = database.session.query(PackagesIssue.issue_type).group_by( - PackagesIssue.issue_type).all() - return jsonify(ResponseCode.response_json( - ResponseCode.SUCCESS, [issue_query[0] for issue_query in issues_query])) - except (SQLAlchemyError, DisconnectionError) as error: - current_app.logger.error(error) - return jsonify(ResponseCode.response_json( - ResponseCode.PARAM_ERROR)) - - def get(self): - """ - Description: Get all issues info or one specific issue - Args: - Returns: - [ - "issue_type", - "issue_type" - ] - Raises: - DisconnectionError: Unable to connect to database exception - AttributeError: Object does not have this property - TypeError: Exception of type - Error: Abnormal error - """ - return self._get_issue_type() - - -class IssueStatus(Resource): - """ - Issue status collection - """ - - def _get_issue_status(self): - """ - Description: Query issue status - Returns: - """ - try: - with DBHelper(db_name='lifecycle') as database: - issues_query = database.session.query(PackagesIssue.issue_status).group_by( - PackagesIssue.issue_status).all() - return jsonify(ResponseCode.response_json( - ResponseCode.SUCCESS, [issue_query[0] for issue_query in issues_query])) - except (SQLAlchemyError, DisconnectionError) as error: - current_app.logger.error(error) - return jsonify(ResponseCode.response_json( - ResponseCode.PARAM_ERROR)) - - def get(self): - """ - Description: Get all issues info or one specific issue - Args: - Returns: - [ - "issue_status", - "issue_status" - ] - Raises: - DisconnectionError: Unable to connect to database exception - AttributeError: Object does not have this property - TypeError: Exception of type - Error: Abnormal error - """ - return self._get_issue_status() - - -class IssueCatch(Resource): - """ - description: Catch issue content - Restful API: put - ChangeLog: - """ - - def post(self): - """ - Searching issue content - Args: - Returns: - for examples: - [ - { - "issue_id": "", - "issue_url": "", - "issue_title": "", - "issue_content": "", - "issue_status": "", - "issue_type": "" - }, - ] - Raises: - DisconnectionError: Unable to connect to database exception - AttributeError: Object does not have this property - TypeError: Exception of type - Error: Abnormal error - """ - data = json.loads(request.get_data()) - if not isinstance(data, dict): - return jsonify( - ResponseCode.response_json(ResponseCode.PARAM_ERROR)) - pkg_name = data["repository"]["path"] - try: - _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) - pool_workers = _readconfig.get_config('LIFECYCLE', 'pool_workers') - _warehouse = _readconfig.get_config('LIFECYCLE', 'warehouse') - if _warehouse is None: - _warehouse = 'src-openeuler' - if not isinstance(pool_workers, int): - pool_workers = 10 - pool = ThreadPoolExecutor(max_workers=pool_workers) - with DBHelper(db_name="lifecycle") as database: - for table_name in filter(lambda x: x not in ['packages_issue', 'packages_maintainer'], - database.engine.table_names()): - cls_model = Packages.package_meta(table_name) - for package_item in database.session.query(cls_model).filter( - cls_model.name == pkg_name).all(): - gitee_issue = gitee( - package_item, _warehouse, package_item.name, table_name) - pool.submit(gitee_issue.issue_hooks, data) - pool.shutdown() - return jsonify(ResponseCode.response_json(ResponseCode.SUCCESS)) - except SQLAlchemyError as error_msg: - current_app.logger.error(error_msg) - - -class UpdatePackages(Resource): - """ - description:Life cycle update information of a single package - Restful API: post - ChangeLog: - """ - - def _get_all_yaml_name(self, filepath): - """ - List of all yaml file names in the folder - - Args: - filepath: file path - - Returns: - yaml_file_list:List of all yaml file names in the folder - - Attributes: - Error:Error - NotADirectoryError:Invalid directory name - FileNotFoundError:File not found error - - """ - try: - yaml_file_list = os.listdir(filepath) - return yaml_file_list - except (Error, NotADirectoryError, FileNotFoundError) as error: - current_app.logger.error(error) - return None - - def _get_yaml_content(self, yaml_file, filepath): - """ - Read the content of the yaml file - - Args: - yaml_file: yaml file - filepath: file path - - Returns: - Return a dictionary containing name, maintainer and maintainlevel - """ - yaml_data_dict = dict() - if not yaml_file.endswith(".yaml"): - return None - pkg_name = yaml_file.rsplit('.yaml')[0] - single_yaml_path = os.path.join(filepath, yaml_file) - with open(single_yaml_path, 'r', encoding='utf-8') as file_context: - yaml_flie_data = yaml.load( - file_context.read(), Loader=yaml.FullLoader) - if yaml_flie_data is None or not isinstance(yaml_flie_data, dict): - return None - maintainer = yaml_flie_data.get("maintainer") - maintainlevel = yaml_flie_data.get("maintainlevel") - yaml_data_dict['name'] = pkg_name - if maintainer: - yaml_data_dict['maintainer'] = maintainer - if maintainlevel: - yaml_data_dict['maintainlevel'] = maintainlevel - return yaml_data_dict - - def _read_yaml_file(self, filepath): - """ - Read the yaml file and combine the data of the nested dictionary of the list - - Args: - filepath: file path - - Returns: - yaml.YAMLError:yaml file error - SQLAlchemyError:SQLAlchemy Error - DisconnectionError:Connect to database error - Error:Error - """ - yaml_file_list = self._get_all_yaml_name(filepath) - if not yaml_file_list: - return None - try: - yaml_data_list = list() - _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) - pool_workers = _readconfig.get_config('LIFECYCLE', 'pool_workers') - if not isinstance(pool_workers, int): - pool_workers = 10 - with ThreadPoolExecutor(max_workers=pool_workers) as pool: - for yaml_file in yaml_file_list: - pool_result = pool.submit( - self._get_yaml_content, yaml_file, filepath) - yaml_data_dict = pool_result.result() - yaml_data_list.append(yaml_data_dict) - return yaml_data_list - except (yaml.YAMLError, SQLAlchemyError, DisconnectionError, Error) as error: - current_app.logger.error(error) - return None - - def _verification_yaml_data_list(self, yaml_data_list): - """ - Verify the data obtained in the yaml file - - Args: - yaml_data_list: yaml data list - - Returns: - yaml_data_list: After verification yaml data list - - Attributes: - ValidationError: Validation error - - """ - try: - DataFormatVerfi(many=True).load(yaml_data_list) - return yaml_data_list - except ValidationError as error: - current_app.logger.error(error.messages) - return None - - def _save_in_database(self, yaml_data_list): - """ - Save the data to the database - - Args: - tbname: Table Name - name_separate_list: Split name list - _update_pack_data: Split new list of combined data - - Returns: - SUCCESS or UPDATA_DATA_FAILED - - Attributes - DisconnectionError: Connect to database error - SQLAlchemyError: SQLAlchemy Error - Error: Error - - """ - try: - with DBHelper(db_name="lifecycle") as database_name: - if 'packages_maintainer' not in database_name.engine.table_names(): - return jsonify(ResponseCode.response_json( - ResponseCode.TABLE_NAME_NOT_EXIST)) - database_name.session.begin(subtransactions=True) - for yaml_data in yaml_data_list: - name = yaml_data.get("name") - maintainer = yaml_data.get("maintainer") - maintainlevel = yaml_data.get("maintainlevel") - packages_maintainer_obj = database_name.session.query( - PackagesMaintainer).filter_by(name=name).first() - if packages_maintainer_obj: - if maintainer: - packages_maintainer_obj.maintainer = maintainer - if maintainlevel: - packages_maintainer_obj.maintainlevel = maintainlevel - else: - database_name.add(PackagesMaintainer( - name=name, maintainer=maintainer, maintainlevel=maintainlevel - )) - database_name.session.commit() - return jsonify(ResponseCode.response_json( - ResponseCode.SUCCESS)) - except (DisconnectionError, SQLAlchemyError, Error, AttributeError) as error: - current_app.logger.error(error) - return jsonify(ResponseCode.response_json( - ResponseCode.UPDATA_DATA_FAILED)) - - def _overall_process( - self, - filepath): - """ - Call each method to complete the entire function - - Args: - filepath: file path - tbname: table name - - Returns: - SUCCESS or UPDATA_DATA_FAILED - - Attributes - DisconnectionError: Connect to database error - SQLAlchemyError: SQLAlchemy Error - Error: Error - """ - try: - if filepath is None or not os.path.exists(filepath): - return jsonify(ResponseCode.response_json( - ResponseCode.SPECIFIED_FILE_NOT_EXIST)) - yaml_file_list = self._get_all_yaml_name(filepath) - if not yaml_file_list: - return jsonify(ResponseCode.response_json( - ResponseCode.EMPTY_FOLDER)) - yaml_data_list_result = self._read_yaml_file(filepath) - yaml_data_list = self._verification_yaml_data_list( - yaml_data_list_result) - if yaml_data_list is None: - return jsonify(ResponseCode.response_json( - ResponseCode.YAML_FILE_ERROR)) - result = self._save_in_database( - yaml_data_list) - return result - except (DisconnectionError, SQLAlchemyError, Error) as error: - current_app.logger.error(error) - return jsonify(ResponseCode.response_json( - ResponseCode.UPDATA_DATA_FAILED)) - - def _update_single_package_info( - self, srcname, maintainer, maintainlevel): - """ - Update the maintainer field and maintainlevel - field of a single package - - Args: - srcname: The name of the source package - maintainer: Package maintainer - maintainlevel: Package maintenance level - - Returns: - success or failed - - Attributes - SQLAlchemyError: sqlalchemy error - DisconnectionError: Cannot connect to database error - Error: Error - """ - if not srcname: - return jsonify( - ResponseCode.response_json(ResponseCode.PACK_NAME_NOT_FOUND) - ) - if not maintainer and not maintainlevel: - return jsonify( - ResponseCode.response_json(ResponseCode.PARAM_ERROR) - ) - try: - with DBHelper(db_name='lifecycle') as database_name: - if 'packages_maintainer' not in database_name.engine.table_names(): - return jsonify(ResponseCode.response_json( - ResponseCode.TABLE_NAME_NOT_EXIST)) - update_obj = database_name.session.query( - PackagesMaintainer).filter_by(name=srcname).first() - if update_obj: - if maintainer: - update_obj.maintainer = maintainer - if maintainlevel: - update_obj.maintainlevel = maintainlevel - else: - database_name.add(PackagesMaintainer( - name=srcname, maintainer=maintainer, maintainlevel=maintainlevel - )) - database_name.session.commit() - return jsonify( - ResponseCode.response_json( - ResponseCode.SUCCESS)) - except (SQLAlchemyError, DisconnectionError, Error) as sql_error: - current_app.logger.error(sql_error) - database_name.session.rollback() - return jsonify(ResponseCode.response_json( - ResponseCode.UPDATA_DATA_FAILED - )) - - def put(self): - """ - Life cycle update information of a single package or - All packages - - Returns: - for example:: - { - "code": "", - "data": "", - "msg": "" - } - """ - schema = UpdatePackagesSchema() - data = request.get_json() - if schema.validate(data): - return jsonify( - ResponseCode.response_json(ResponseCode.PARAM_ERROR) - ) - srcname = data.get('pkg_name', None) - maintainer = data.get('maintainer', None) - maintainlevel = data.get('maintainlevel', None) - batch = data.get('batch') - filepath = data.get('filepath', None) - - if batch: - result = self._overall_process(filepath) - else: - result = self._update_single_package_info( - srcname, maintainer, maintainlevel) - return result +#!/usr/bin/python3 +""" +Life cycle related api interface +""" +import io +import json +import math +import os +from concurrent.futures import ThreadPoolExecutor + +import pandas as pd +import yaml + +from flask import request +from flask import jsonify, make_response +from flask import current_app +from flask_restful import Resource +from marshmallow import ValidationError + +from sqlalchemy.exc import DisconnectionError, SQLAlchemyError + +from packageship import system_config +from packageship.libs.configutils.readconfig import ReadConfig +from packageship.libs.exception import Error +from packageship.application.apps.package.function.constants import ResponseCode +from packageship.libs.dbutils.sqlalchemy_helper import DBHelper +from packageship.application.models.package import PackagesIssue +from packageship.application.models.package import Packages +from packageship.application.models.package import PackagesMaintainer +from packageship.libs.log import Log +from .serialize import IssueDownloadSchema, PackagesDownloadSchema, IssuePageSchema, IssueSchema +from ..package.serialize import DataFormatVerfi, UpdatePackagesSchema +from .function.gitee import Gitee as gitee + +LOGGER = Log(__name__) + + +# pylint: disable = no-self-use + +class DownloadFile(Resource): + """ + Download the content of the issue or the excel file of the package content + """ + + def _download_excel(self, file_type, table_name=None): + """ + Download excel file + """ + file_name = 'packages.xlsx' + if file_type == 'packages': + download_content = self.__get_packages_content(table_name) + else: + file_name = 'issues.xlsx' + download_content = self.__get_issues_content() + if download_content is None: + return jsonify( + ResponseCode.response_json( + ResponseCode.SERVICE_ERROR)) + pd_dataframe = self.__to_dataframe(download_content) + + _response = self.__bytes_save(pd_dataframe) + return self.__set_response_header(_response, file_name) + + def __bytes_save(self, data_frame): + """ + Save the file content in the form of a binary file stream + """ + try: + bytes_io = io.BytesIO() + writer = pd.ExcelWriter( # pylint: disable=abstract-class-instantiated + bytes_io, engine='xlsxwriter') + data_frame.to_excel(writer, sheet_name='Summary', index=False) + writer.save() + writer.close() + bytes_io.seek(0) + _response = make_response(bytes_io.getvalue()) + bytes_io.close() + return _response + except (IOError, Error) as io_error: + current_app.logger.error(io_error) + return make_response() + + def __set_response_header(self, response, file_name): + """ + Set http response header information + """ + response.headers['Content-Type'] = \ + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + response.headers["Cache-Control"] = "no-cache" + response.headers['Content-Disposition'] = 'attachment; filename={file_name}'.format( + file_name=file_name) + return response + + def __get_packages_content(self, table_name): + """ + Get package list information + """ + try: + with DBHelper(db_name='lifecycle') as database: + # Query all package data in the specified table + _model = Packages.package_meta(table_name) + _packageinfos = database.session.query(_model).all() + packages_dicts = PackagesDownloadSchema( + many=True).dump(_packageinfos) + return packages_dicts + + except (SQLAlchemyError, DisconnectionError) as error: + current_app.logger.error(error) + return None + + def __get_issues_content(self): + """ + Get the list of issues + """ + try: + with DBHelper(db_name='lifecycle') as database: + _issues = database.session.query(PackagesIssue).all() + issues_dicts = IssueDownloadSchema(many=True).dump(_issues) + return issues_dicts + except (SQLAlchemyError, DisconnectionError) as error: + current_app.logger.error(error) + return None + + def __to_dataframe(self, datas): + """ + Convert the obtained information into pandas content format + """ + + data_frame = pd.DataFrame(datas) + return data_frame + + def get(self, file_type): + """ + Download package collection information and isse list information + + """ + if file_type not in ['packages', 'issues']: + return jsonify( + ResponseCode.response_json( + ResponseCode.PARAM_ERROR)) + + table_name = request.args.get('table_name', None) + response = self._download_excel(file_type, table_name) + return response + + +class MaintainerView(Resource): + """ + Maintainer name collection + """ + + def __query_maintainers(self): + """ + Query the names of all maintainers in the specified table + """ + try: + with DBHelper(db_name='lifecycle') as database: + maintainers = database.session.query( + PackagesMaintainer.maintainer).group_by(PackagesMaintainer.maintainer).all() + return [maintainer_item[0] for maintainer_item in maintainers + if maintainer_item[0]] + except (SQLAlchemyError, DisconnectionError) as error: + current_app.logger.error(error) + return [] + + def get(self): + """ + Get the list of maintainers + """ + # Group query of the names of all maintainers in the current table + maintainers = self.__query_maintainers() + return jsonify(ResponseCode.response_json( + ResponseCode.SUCCESS, + maintainers)) + + +class TableColView(Resource): + """ + The default column of the package shows the interface + """ + + def __columns_names(self): + """ + Mapping of column name and title + """ + columns = [ + ('name', 'Name', True), + ('version', 'Version', True), + ('release', 'Release', True), + ('url', 'Url', True), + ('rpm_license', 'License', False), + ('feature', 'Feature', False), + ('maintainer', 'Maintainer', True), + ('maintainlevel', 'Maintenance Level', True), + ('release_time', 'Release Time', False), + ('used_time', 'Used Time', True), + ('maintainer_status', 'Maintain Status', True), + ('latest_version', 'Latest Version', False), + ('latest_version_time', 'Latest Version Release Time', False), + ('issue', 'Issue', True)] + return columns + + def __columns_mapping(self): + """ + + """ + columns = list() + for column in self.__columns_names(): + columns.append({ + 'column_name': column[0], + 'label': column[1], + 'default_selected': column[2] + }) + return columns + + def get(self): + """ + Get the default display column of the package + + """ + table_mapping_columns = self.__columns_mapping() + return jsonify( + ResponseCode.response_json( + ResponseCode.SUCCESS, + table_mapping_columns)) + + +class LifeTables(Resource): + """ + description: LifeTables + Restful API: get + ChangeLog: + """ + + def get(self): + """ + return all table names in the database + + Returns: + Return the table names in the database as a list + """ + try: + with DBHelper(db_name="lifecycle") as database_name: + # View all table names in the package-info database + all_table_names = database_name.engine.table_names() + all_table_names.remove("packages_issue") + all_table_names.remove("packages_maintainer") + return jsonify( + ResponseCode.response_json( + ResponseCode.SUCCESS, data=all_table_names) + ) + except (SQLAlchemyError, DisconnectionError, Error, ValueError) as sql_error: + LOGGER.logger.error(sql_error) + return jsonify( + ResponseCode.response_json(ResponseCode.DATABASE_NOT_FOUND) + ) + + +class IssueView(Resource): + """ + Issue content collection + """ + + def _query_issues(self, request_data): + """ + Args: + request_data: + Returns: + """ + try: + with DBHelper(db_name='lifecycle') as database: + issues_query = database.session.query(PackagesIssue.issue_id, + PackagesIssue.issue_url, + PackagesIssue.issue_title, + PackagesIssue.issue_status, + PackagesIssue.pkg_name, + PackagesIssue.issue_type, + PackagesMaintainer.maintainer). \ + outerjoin(PackagesMaintainer, + PackagesMaintainer.name == PackagesIssue.pkg_name) + if request_data.get("pkg_name"): + issues_query = issues_query.filter( + PackagesIssue.pkg_name == request_data.get("pkg_name")) + if request_data.get("issue_type"): + issues_query = issues_query.filter( + PackagesIssue.issue_type == request_data.get("issue_type")) + if request_data.get("issue_status"): + issues_query = issues_query.filter( + PackagesIssue.issue_status == request_data.get("issue_status")) + if request_data.get("maintainer"): + issues_query = issues_query.filter( + PackagesMaintainer.maintainer == request_data.get("maintainer")) + total_count = issues_query.count() + total_page = math.ceil( + total_count / int(request_data.get("page_size"))) + issues_query = issues_query.limit(request_data.get("page_size")).offset( + (int(request_data.get("page_num")) - 1) * int(request_data.get("page_size"))) + issue_dicts = IssuePageSchema( + many=True).dump(issues_query.all()) + issue_data = ResponseCode.response_json( + ResponseCode.SUCCESS, issue_dicts) + issue_data['total_count'] = total_count + issue_data['total_page'] = total_page + return issue_data + except (SQLAlchemyError, DisconnectionError) as error: + current_app.logger.error(error) + return ResponseCode.response_json(ResponseCode.DATABASE_NOT_FOUND) + + def get(self): + """ + Description: Get all issues info or one specific issue + Args: + Returns: + [ + { + "issue_id": "", + "issue_url": "", + "issue_title": "", + "issue_content": "", + "issue_status": "", + "issue_type": "" + }, + ] + Raises: + DisconnectionError: Unable to connect to database exception + AttributeError: Object does not have this property + TypeError: Exception of type + Error: Abnormal error + """ + schema = IssueSchema() + if schema.validate(request.args): + return jsonify( + ResponseCode.response_json(ResponseCode.PARAM_ERROR) + ) + issue_dict = self._query_issues(request.args) + return issue_dict + + +class IssueType(Resource): + """ + Issue type collection + """ + + def _get_issue_type(self): + """ + Description: Query issue type + Returns: + """ + try: + with DBHelper(db_name='lifecycle') as database: + issues_query = database.session.query(PackagesIssue.issue_type).group_by( + PackagesIssue.issue_type).all() + return jsonify(ResponseCode.response_json( + ResponseCode.SUCCESS, [issue_query[0] for issue_query in issues_query])) + except (SQLAlchemyError, DisconnectionError) as error: + current_app.logger.error(error) + return jsonify(ResponseCode.response_json( + ResponseCode.PARAM_ERROR)) + + def get(self): + """ + Description: Get all issues info or one specific issue + Args: + Returns: + [ + "issue_type", + "issue_type" + ] + Raises: + DisconnectionError: Unable to connect to database exception + AttributeError: Object does not have this property + TypeError: Exception of type + Error: Abnormal error + """ + return self._get_issue_type() + + +class IssueStatus(Resource): + """ + Issue status collection + """ + + def _get_issue_status(self): + """ + Description: Query issue status + Returns: + """ + try: + with DBHelper(db_name='lifecycle') as database: + issues_query = database.session.query(PackagesIssue.issue_status).group_by( + PackagesIssue.issue_status).all() + return jsonify(ResponseCode.response_json( + ResponseCode.SUCCESS, [issue_query[0] for issue_query in issues_query])) + except (SQLAlchemyError, DisconnectionError) as error: + current_app.logger.error(error) + return jsonify(ResponseCode.response_json( + ResponseCode.PARAM_ERROR)) + + def get(self): + """ + Description: Get all issues info or one specific issue + Args: + Returns: + [ + "issue_status", + "issue_status" + ] + Raises: + DisconnectionError: Unable to connect to database exception + AttributeError: Object does not have this property + TypeError: Exception of type + Error: Abnormal error + """ + return self._get_issue_status() + + +class IssueCatch(Resource): + """ + description: Catch issue content + Restful API: put + ChangeLog: + """ + + def post(self): + """ + Searching issue content + Args: + Returns: + for examples: + [ + { + "issue_id": "", + "issue_url": "", + "issue_title": "", + "issue_content": "", + "issue_status": "", + "issue_type": "" + }, + ] + Raises: + DisconnectionError: Unable to connect to database exception + AttributeError: Object does not have this property + TypeError: Exception of type + Error: Abnormal error + """ + data = json.loads(request.get_data()) + if not isinstance(data, dict): + return jsonify( + ResponseCode.response_json(ResponseCode.PARAM_ERROR)) + pkg_name = data["repository"]["path"] + try: + _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) + pool_workers = _readconfig.get_config('LIFECYCLE', 'pool_workers') + _warehouse = _readconfig.get_config('LIFECYCLE', 'warehouse') + if _warehouse is None: + _warehouse = 'src-openeuler' + if not isinstance(pool_workers, int): + pool_workers = 10 + pool = ThreadPoolExecutor(max_workers=pool_workers) + with DBHelper(db_name="lifecycle") as database: + for table_name in filter(lambda x: x not in ['packages_issue', 'packages_maintainer', 'database_info'], + database.engine.table_names()): + cls_model = Packages.package_meta(table_name) + for package_item in database.session.query(cls_model).filter( + cls_model.name == pkg_name).all(): + gitee_issue = gitee( + package_item, _warehouse, package_item.name, table_name) + pool.submit(gitee_issue.issue_hooks, data) + pool.shutdown() + return jsonify(ResponseCode.response_json(ResponseCode.SUCCESS)) + except SQLAlchemyError as error_msg: + current_app.logger.error(error_msg) + + +class UpdatePackages(Resource): + """ + description:Life cycle update information of a single package + Restful API: post + ChangeLog: + """ + + def _get_all_yaml_name(self, filepath): + """ + List of all yaml file names in the folder + + Args: + filepath: file path + + Returns: + yaml_file_list:List of all yaml file names in the folder + + Attributes: + Error:Error + NotADirectoryError:Invalid directory name + FileNotFoundError:File not found error + + """ + try: + yaml_file_list = os.listdir(filepath) + return yaml_file_list + except (Error, NotADirectoryError, FileNotFoundError) as error: + current_app.logger.error(error) + return None + + def _get_yaml_content(self, yaml_file, filepath): + """ + Read the content of the yaml file + + Args: + yaml_file: yaml file + filepath: file path + + Returns: + Return a dictionary containing name, maintainer and maintainlevel + """ + yaml_data_dict = dict() + if not yaml_file.endswith(".yaml"): + return None + pkg_name = yaml_file.rsplit('.yaml')[0] + single_yaml_path = os.path.join(filepath, yaml_file) + with open(single_yaml_path, 'r', encoding='utf-8') as file_context: + yaml_flie_data = yaml.load( + file_context.read(), Loader=yaml.FullLoader) + if yaml_flie_data is None or not isinstance(yaml_flie_data, dict): + return None + maintainer = yaml_flie_data.get("maintainer") + maintainlevel = yaml_flie_data.get("maintainlevel") + yaml_data_dict['name'] = pkg_name + if maintainer: + yaml_data_dict['maintainer'] = maintainer + if maintainlevel: + yaml_data_dict['maintainlevel'] = maintainlevel + return yaml_data_dict + + def _read_yaml_file(self, filepath): + """ + Read the yaml file and combine the data of the nested dictionary of the list + + Args: + filepath: file path + + Returns: + yaml.YAMLError:yaml file error + SQLAlchemyError:SQLAlchemy Error + DisconnectionError:Connect to database error + Error:Error + """ + yaml_file_list = self._get_all_yaml_name(filepath) + if not yaml_file_list: + return None + try: + yaml_data_list = list() + _readconfig = ReadConfig(system_config.SYS_CONFIG_PATH) + pool_workers = _readconfig.get_config('LIFECYCLE', 'pool_workers') + if not isinstance(pool_workers, int): + pool_workers = 10 + with ThreadPoolExecutor(max_workers=pool_workers) as pool: + for yaml_file in yaml_file_list: + pool_result = pool.submit( + self._get_yaml_content, yaml_file, filepath) + yaml_data_dict = pool_result.result() + yaml_data_list.append(yaml_data_dict) + return yaml_data_list + except (yaml.YAMLError, SQLAlchemyError, DisconnectionError, Error) as error: + current_app.logger.error(error) + return None + + def _verification_yaml_data_list(self, yaml_data_list): + """ + Verify the data obtained in the yaml file + + Args: + yaml_data_list: yaml data list + + Returns: + yaml_data_list: After verification yaml data list + + Attributes: + ValidationError: Validation error + + """ + try: + DataFormatVerfi(many=True).load(yaml_data_list) + return yaml_data_list + except ValidationError as error: + current_app.logger.error(error.messages) + return None + + def _save_in_database(self, yaml_data_list): + """ + Save the data to the database + + Args: + tbname: Table Name + name_separate_list: Split name list + _update_pack_data: Split new list of combined data + + Returns: + SUCCESS or UPDATA_DATA_FAILED + + Attributes + DisconnectionError: Connect to database error + SQLAlchemyError: SQLAlchemy Error + Error: Error + + """ + try: + with DBHelper(db_name="lifecycle") as database_name: + if 'packages_maintainer' not in database_name.engine.table_names(): + return jsonify(ResponseCode.response_json( + ResponseCode.TABLE_NAME_NOT_EXIST)) + database_name.session.begin(subtransactions=True) + for yaml_data in yaml_data_list: + name = yaml_data.get("name") + maintainer = yaml_data.get("maintainer") + maintainlevel = yaml_data.get("maintainlevel") + packages_maintainer_obj = database_name.session.query( + PackagesMaintainer).filter_by(name=name).first() + if packages_maintainer_obj: + if maintainer: + packages_maintainer_obj.maintainer = maintainer + if maintainlevel: + packages_maintainer_obj.maintainlevel = maintainlevel + else: + database_name.add(PackagesMaintainer( + name=name, maintainer=maintainer, maintainlevel=maintainlevel + )) + database_name.session.commit() + return jsonify(ResponseCode.response_json( + ResponseCode.SUCCESS)) + except (DisconnectionError, SQLAlchemyError, Error, AttributeError) as error: + current_app.logger.error(error) + return jsonify(ResponseCode.response_json( + ResponseCode.UPDATA_DATA_FAILED)) + + def _overall_process( + self, + filepath): + """ + Call each method to complete the entire function + + Args: + filepath: file path + tbname: table name + + Returns: + SUCCESS or UPDATA_DATA_FAILED + + Attributes + DisconnectionError: Connect to database error + SQLAlchemyError: SQLAlchemy Error + Error: Error + """ + try: + if filepath is None or not os.path.exists(filepath): + return jsonify(ResponseCode.response_json( + ResponseCode.SPECIFIED_FILE_NOT_EXIST)) + yaml_file_list = self._get_all_yaml_name(filepath) + if not yaml_file_list: + return jsonify(ResponseCode.response_json( + ResponseCode.EMPTY_FOLDER)) + yaml_data_list_result = self._read_yaml_file(filepath) + yaml_data_list = self._verification_yaml_data_list( + yaml_data_list_result) + if yaml_data_list is None: + return jsonify(ResponseCode.response_json( + ResponseCode.YAML_FILE_ERROR)) + result = self._save_in_database( + yaml_data_list) + return result + except (DisconnectionError, SQLAlchemyError, Error) as error: + current_app.logger.error(error) + return jsonify(ResponseCode.response_json( + ResponseCode.UPDATA_DATA_FAILED)) + + def _update_single_package_info( + self, srcname, maintainer, maintainlevel): + """ + Update the maintainer field and maintainlevel + field of a single package + + Args: + srcname: The name of the source package + maintainer: Package maintainer + maintainlevel: Package maintenance level + + Returns: + success or failed + + Attributes + SQLAlchemyError: sqlalchemy error + DisconnectionError: Cannot connect to database error + Error: Error + """ + if not srcname: + return jsonify( + ResponseCode.response_json(ResponseCode.PACK_NAME_NOT_FOUND) + ) + if not maintainer and not maintainlevel: + return jsonify( + ResponseCode.response_json(ResponseCode.PARAM_ERROR) + ) + try: + with DBHelper(db_name='lifecycle') as database_name: + if 'packages_maintainer' not in database_name.engine.table_names(): + return jsonify(ResponseCode.response_json( + ResponseCode.TABLE_NAME_NOT_EXIST)) + update_obj = database_name.session.query( + PackagesMaintainer).filter_by(name=srcname).first() + if update_obj: + if maintainer: + update_obj.maintainer = maintainer + if maintainlevel: + update_obj.maintainlevel = maintainlevel + else: + database_name.add(PackagesMaintainer( + name=srcname, maintainer=maintainer, maintainlevel=maintainlevel + )) + database_name.session.commit() + return jsonify( + ResponseCode.response_json( + ResponseCode.SUCCESS)) + except (SQLAlchemyError, DisconnectionError, Error) as sql_error: + current_app.logger.error(sql_error) + database_name.session.rollback() + return jsonify(ResponseCode.response_json( + ResponseCode.UPDATA_DATA_FAILED + )) + + def put(self): + """ + Life cycle update information of a single package or + All packages + + Returns: + for example:: + { + "code": "", + "data": "", + "msg": "" + } + """ + schema = UpdatePackagesSchema() + data = request.get_json() + if schema.validate(data): + return jsonify( + ResponseCode.response_json(ResponseCode.PARAM_ERROR) + ) + srcname = data.get('pkg_name', None) + maintainer = data.get('maintainer', None) + maintainlevel = data.get('maintainlevel', None) + batch = data.get('batch') + filepath = data.get('filepath', None) + + if batch: + result = self._overall_process(filepath) + else: + result = self._update_single_package_info( + srcname, maintainer, maintainlevel) + return result diff -Naru a/packageship/application/apps/package/function/be_depend.py b/packageship/application/apps/package/function/be_depend.py --- a/packageship/application/apps/package/function/be_depend.py 2020-09-22 23:34:04.037937224 +0800 +++ b/packageship/application/apps/package/function/be_depend.py 2020-09-22 23:48:32.402476132 +0800 @@ -5,11 +5,12 @@ This includes both install and build dependencies Class: BeDepend """ +import copy +from collections import namedtuple, defaultdict from flask import current_app from sqlalchemy import text from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.sql import literal_column -from packageship.application.apps.package.function.constants import ResponseCode from packageship.application.models.package import SrcPack from packageship.libs.dbutils import DBHelper @@ -36,6 +37,8 @@ self.source_name_set = set() self.bin_name_set = set() self.result_dict = dict() + self.comm_install_builds = defaultdict(set) + self.provides_name = set() def main(self): """ @@ -69,14 +72,16 @@ [["root", None]] ] self.source_name_set.add(self.source_name) - self.package_bedepend( + self._provides_bedepend( [self.source_name], data_base, package_type='src') + for _, value in self.result_dict.items(): + value[-1] = list(value[-1]) return self.result_dict - def package_bedepend(self, pkg_name_list, data_base, package_type): + def _get_provides(self, pkg_name_list, data_base, package_type): """ - Description: Query the dependent function + Description: Query the components provided by the required package Args: pkg_name_list:source or binary packages name data_base: database @@ -84,35 +89,31 @@ Returns: Raises: SQLAlchemyError: Database connection exception - """ - + """ + res = namedtuple( + 'restuple', [ + 'search_bin_name', 'search_bin_version', 'source_name']) sql_com = """ - SELECT DISTINCT b1.name AS search_bin_name, + SELECT DISTINCT b1.name AS search_bin_name, b1.version AS search_bin_version, b1.src_name AS source_name, - b2.name AS bin_name, - s1.name AS bebuild_src_name, - b2.src_name AS install_depend_src_name + bin_provides.name As pro_name FROM ( SELECT pkgKey,src_name,name,version FROM bin_pack WHERE {} ) b1 - LEFT JOIN bin_provides ON bin_provides.pkgKey = b1.pkgKey - LEFT JOIN bin_requires br ON br.name = bin_provides.name - LEFT JOIN src_requires sr ON sr.name = bin_provides.name - LEFT JOIN src_pack s1 ON s1.pkgKey = sr.pkgKey - LEFT JOIN bin_pack b2 ON b2.pkgKey = br.pkgKey - """ + LEFT JOIN bin_provides ON bin_provides.pkgKey = b1.pkgKey;""" + # package_type if package_type == 'src': literal_name = 'src_name' - elif package_type == 'bin': literal_name = 'name' - else: - return - + # Query database + # The lower version of SQLite can look up up to 999 parameters + # simultaneously, so use 900 sharding queries try: result = [] - for input_name in (pkg_name_list[i:i+900] for i in range(0, len(pkg_name_list), 900)): + for input_name in (pkg_name_list[i:i + 900] + for i in range(0, len(pkg_name_list), 900)): name_in = literal_column(literal_name).in_(input_name) sql_str = text(sql_com.format(name_in)) result.extend(data_base.session.execute( @@ -124,74 +125,176 @@ ).fetchall()) except SQLAlchemyError as sql_err: current_app.logger.error(sql_err) - return ResponseCode.response_json(ResponseCode.CONNECT_DB_ERROR) + return if not result: return - # Source and binary packages that were found to be dependent - source_name_list = [] - bin_name_list = [] + # Process the result of the component + pro_name_dict = dict() + + _components = set() for obj in result: - if obj.source_name is None: - source_name = 'NOT FOUND' - else: - source_name = obj.source_name - if obj.bebuild_src_name: - # Determine if the source package has been checked - parent_node = obj.bebuild_src_name - be_type = "build" - # Call the spell dictionary function - self.make_dicts( - obj.search_bin_name, - source_name, + if not obj.pro_name: + continue + # De-weight components + if obj.pro_name not in self.comm_install_builds: + pro_name_dict[obj.pro_name] = res( + obj.search_bin_name, obj.search_bin_version, obj.source_name) + + if obj.search_bin_name not in self.result_dict: + self.result_dict[obj.search_bin_name] = [ + obj.source_name, obj.search_bin_version, - parent_node, - be_type) + self.db_name, + self.comm_install_builds[obj.pro_name] + if self.comm_install_builds[obj.pro_name] else {(None, None)} + ] + tmp_ = copy.deepcopy(self.comm_install_builds[obj.pro_name]) - if obj.bebuild_src_name not in self.source_name_set: - self.source_name_set.add(obj.bebuild_src_name) - source_name_list.append(obj.bebuild_src_name) - - if obj.bin_name: - # Determine if the bin package has been checked - parent_node = obj.bin_name - be_type = "install" - # Call the spell dictionary function - self.make_dicts( - obj.search_bin_name, - source_name, - obj.search_bin_version, - parent_node, - be_type) + tmp_.discard((obj.search_bin_name, 'install')) + tmp_.discard((obj.search_bin_name, 'build')) - if obj.bin_name not in self.bin_name_set: - self.bin_name_set.add(obj.bin_name) - bin_name_list.append(obj.bin_name) - - # With_sub_pack=1 - if self.with_sub_pack == "1": - if obj.install_depend_src_name not in self.source_name_set: - self.source_name_set.add( - obj.install_depend_src_name) - source_name_list.append( - obj.install_depend_src_name) - - if obj.bebuild_src_name is None and obj.bin_name is None: - parent_node = None - be_type = None - self.make_dicts( - obj.search_bin_name, - source_name, - obj.search_bin_version, - parent_node, - be_type) + if (None, None) in self.result_dict[obj.search_bin_name][-1] \ + and self.comm_install_builds[obj.pro_name]: + self.result_dict[obj.search_bin_name][-1] = tmp_ + else: + self.result_dict[obj.search_bin_name][-1].update(tmp_) + return pro_name_dict + + def _provides_bedepend(self, pkg_name_list, data_base, package_type): + """ + Description: Query the dependent function + Args: + pkg_name_list:source or binary packages name + data_base: database + package_type: package type + Returns: + Raises: + SQLAlchemyError: Database connection exception + """ + # Query component + pro_names = self._get_provides(pkg_name_list, data_base, package_type) - if len(source_name_list) != 0: - self.package_bedepend( + if not pro_names: + return + + sql_2_bin = """ + SELECT DISTINCT + b2.name AS bin_name, + b2.src_name AS install_depend_src_name, + br.name AS pro_name + FROM + ( SELECT name, pkgKey FROM bin_requires WHERE {}) br + LEFT JOIN bin_pack b2 ON b2.pkgKey = br.pkgKey; + """ + + sql_2_src = """ + SELECT DISTINCT + s1.name AS bebuild_src_name, + sr.name AS pro_name + FROM + ( SELECT name, pkgKey FROM src_requires WHERE {} ) sr + LEFT JOIN src_pack s1 ON s1.pkgKey = sr.pkgKey; + """ + + provides_name_list = [pro for pro, _ in pro_names.items()] + + result_2_bin = [] + result_2_src = [] + # Query database + try: + for input_name in ( + provides_name_list[i:i + 900] for i in range(0, len(provides_name_list), 900)): + name_in = literal_column('name').in_(input_name) + sql_str_2_bin = text(sql_2_bin.format(name_in)) + result_2_bin.extend(data_base.session.execute( + sql_str_2_bin, + { + 'name_{}'.format(i): v + for i, v in enumerate(input_name, 1) + } + ).fetchall()) + sql_str_2src = text(sql_2_src.format(name_in)) + result_2_src.extend(data_base.session.execute( + sql_str_2src, + { + 'name_{}'.format(i): v + for i, v in enumerate(input_name, 1) + } + ).fetchall()) + except SQLAlchemyError as sql_err: + current_app.logger.error(sql_err) + return + + source_name_list = [] + bin_name_list = [] + + # Process the data that the installation depends on + for bin_info in result_2_bin: + temp_bin_pkg = bin_info.bin_name + temp_sub_src_pkg = bin_info.install_depend_src_name + + #withsubpick ==1 + if self.with_sub_pack == '1' and temp_sub_src_pkg not in self.source_name_set: + self.source_name_set.add(temp_sub_src_pkg) + source_name_list.append(temp_sub_src_pkg) + + if temp_bin_pkg not in self.bin_name_set: + self.bin_name_set.add(temp_bin_pkg) + bin_name_list.append(temp_bin_pkg) + + if bin_info.pro_name not in self.comm_install_builds: + self.comm_install_builds[bin_info.pro_name] = { + (bin_info.bin_name, 'install') + } + + elif (bin_info.bin_name, 'install') not in \ + self.comm_install_builds[bin_info.pro_name]: + + self.comm_install_builds[bin_info.pro_name].add( + (bin_info.bin_name, 'install') + ) + + self.make_dicts( + pro_names.get(bin_info.pro_name).search_bin_name, + pro_names.get(bin_info.pro_name).source_name, + pro_names.get(bin_info.pro_name).search_bin_version, + bin_info.bin_name, + 'install' + ) + # Process data that is compile-dependent + for src_info in result_2_src: + if src_info.bebuild_src_name not in self.source_name_set: + self.source_name_set.add(src_info.bebuild_src_name) + source_name_list.append(src_info.bebuild_src_name) + + if src_info.pro_name not in self.comm_install_builds: + self.comm_install_builds[src_info.pro_name] = { + (src_info.bebuild_src_name, 'build') + } + elif (src_info.bebuild_src_name, 'build') not in \ + self.comm_install_builds[src_info.pro_name]: + + self.comm_install_builds[src_info.pro_name].add( + (src_info.bebuild_src_name, 'build') + ) + + self.make_dicts( + pro_names.get(src_info.pro_name).search_bin_name, + pro_names.get(src_info.pro_name).source_name, + pro_names.get(src_info.pro_name).search_bin_version, + src_info.bebuild_src_name, + 'build' + ) + # Recursively query all source packages that need to be looked up + if source_name_list: + self._provides_bedepend( source_name_list, data_base, package_type="src") - if len(bin_name_list) != 0: - self.package_bedepend(bin_name_list, data_base, package_type="bin") + # Recursively query all binary packages that need to be looked up + if bin_name_list: + self._provides_bedepend( + bin_name_list, data_base, package_type="bin") def make_dicts(self, key, source_name, version, parent_node, be_type): """ @@ -210,29 +313,27 @@ source_name, version, self.db_name, - [ - [parent_node, + { + (parent_node, be_type - ] - ] + ) + } + ] else: if be_type and parent_node: - if [None, None] in self.result_dict[key][-1]: - self.result_dict.pop(key) - self.result_dict[key] = [ - source_name, - version, - self.db_name, - [ - [parent_node, - be_type - ] - ] - ] + if (None, None) in self.result_dict[key][-1]: + self.result_dict[key][-1] = { + ( + parent_node, + be_type + ) + } - elif [parent_node, be_type] not in self.result_dict[key][-1]: - self.result_dict[key][-1].append([ - parent_node, - be_type - ]) + elif (parent_node, be_type) not in self.result_dict[key][-1]: + self.result_dict[key][-1].add( + ( + parent_node, + be_type + ) + ) diff -Naru a/packageship/libs/dbutils/sqlalchemy_helper.py b/packageship/libs/dbutils/sqlalchemy_helper.py --- a/packageship/libs/dbutils/sqlalchemy_helper.py 2020-09-22 23:34:04.037937224 +0800 +++ b/packageship/libs/dbutils/sqlalchemy_helper.py 2020-09-22 23:52:23.031681622 +0800 @@ -9,6 +9,7 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import DisconnectionError +from sqlalchemy.exc import OperationalError from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.engine.url import URL from packageship.libs.exception.ext import Error @@ -252,6 +253,8 @@ except SQLAlchemyError as sql_error: self.session.rollback() + if isinstance(sql_error, OperationalError): + raise OperationalError raise Error(sql_error) else: self.session.commit() diff -Naru a/packageship/package.ini b/packageship/package.ini --- a/packageship/package.ini 2020-09-22 23:34:04.037937224 +0800 +++ b/packageship/package.ini 2020-09-22 23:49:12.154683915 +0800 @@ -93,3 +93,5 @@ ; When this value is not set, the system will default to src-openeuler warehouse=src-openeuler +; Maximum queue length +queue_maxsize = 1000 \ No newline at end of file diff -Naru a/packageship/pkgshipd b/packageship/pkgshipd --- a/packageship/pkgshipd 2020-09-22 23:34:04.037937224 +0800 +++ b/packageship/pkgshipd 2020-09-22 23:51:57.323547247 +0800 @@ -1,6 +1,18 @@ #!/bin/bash SYS_PATH=/etc/pkgship OUT_PATH=/var/run/pkgship_uwsgi + +MEM_THRESHOLD='700' +MEM_FREE=`free -m | grep "Mem" | awk '{print $7}'` + +if [ $1 = "start" ] +then + if [ $MEM_FREE -lt $MEM_THRESHOLD ]; then + echo "[ERROR] pkgship tool does not support memory less than ${MEM_THRESHOLD} MB." + exit 0 + fi +fi + if [ ! -d "$OUT_PATH" ]; then mkdir $OUT_PATH fi diff -Naru a/test/common_files/package.ini b/test/common_files/package.ini --- a/test/common_files/package.ini 2020-09-22 23:34:04.041937245 +0800 +++ b/test/common_files/package.ini 2020-09-22 23:50:56.559229634 +0800 @@ -1,30 +1,31 @@ -[SYSTEM] -init_conf_path = C:\Users\TAO\Desktop\pkgship-1.1.0\test\common_files\conf.yaml -write_port = 8080 -query_port = 8090 -write_ip_addr = 127.0.0.1 -query_ip_addr = 127.0.0.1 -remote_host = https://api.openeuler.org/pkgmanage - -[LOG] -log_level = INFO -log_name = log_info.log -backup_count = 10 -max_bytes = 314572800 - -[UWSGI] -daemonize = /var/log/uwsgi.log -buffer-size = 65536 -http-timeout = 600 -harakiri = 600 - -[TIMEDTASK] -open = True -hour = 3 -minute = 0 - -[LIFECYCLE] -warehouse_remote = https://gitee.com/openeuler/openEuler-Advisor/raw/master/upstream-info/ -pool_workers = 10 -warehouse = src-openeuler - +[SYSTEM] +init_conf_path = +write_port = 8080 +query_port = 8090 +write_ip_addr = 127.0.0.1 +query_ip_addr = 127.0.0.1 +remote_host = https://api.openeuler.org/pkgmanage + +[LOG] +log_level = INFO +log_name = log_info.log +backup_count = 10 +max_bytes = 314572800 + +[UWSGI] +daemonize = /var/log/uwsgi.log +buffer-size = 65536 +http-timeout = 600 +harakiri = 600 + +[TIMEDTASK] +open = True +hour = 3 +minute = 0 + +[LIFECYCLE] +warehouse_remote = https://gitee.com/openeuler/openEuler-Advisor/raw/master/upstream-info/ +pool_workers = 10 +warehouse = src-openeuler +queue_maxsize = 1000 +