From ac48cfdcac8c93ba59754899237a37dac40659ac Mon Sep 17 00:00:00 2001 From: algorithmofdish Date: Fri, 18 Nov 2022 11:07:47 +0800 Subject: [PATCH] feat(infer): cause inference optimization 1. categorize metrics and select one as cause with max abnormal score; 2. add virtual metric to complete the fault propagation path; 3. use abnormal score without nomarlization; 4. fix a bug where subgraph are incompletely obtained. --- cause_inference/__main__.py | 4 +- cause_inference/arangodb.py | 24 +++-- cause_inference/cause_infer.py | 11 +-- cause_inference/infer_policy.py | 84 ++++++++++++++---- cause_inference/model.py | 64 +++++++++++--- cause_inference/rule_parser.py | 149 ++++++++++++++++++++++++-------- config/infer-rule.yaml | 98 +++++++++++++++++---- 7 files changed, 337 insertions(+), 97 deletions(-) diff --git a/cause_inference/__main__.py b/cause_inference/__main__.py index baddbe4..093f7ac 100644 --- a/cause_inference/__main__.py +++ b/cause_inference/__main__.py @@ -15,7 +15,7 @@ from cause_inference.config import init_infer_config from cause_inference.model import AbnormalEvent from cause_inference.cause_infer import cause_locating from cause_inference.cause_infer import parse_abn_evt -from cause_inference.cause_infer import normalize_abn_score +from cause_inference.cause_infer import preprocess_abn_score from cause_inference.rule_parser import rule_engine from cause_inference.exceptions import InferenceException from cause_inference.exceptions import DataParseException @@ -172,7 +172,7 @@ def get_recommend_metric_evts(abn_kpi_data: dict) -> List[AbnormalEvent]: metric_evt = AbnormalEvent( timestamp=abn_kpi_data.get('Timestamp'), abnormal_metric_id=metric_data.get('metric', ''), - abnormal_score=normalize_abn_score(metric_data.get('score')), + abnormal_score=preprocess_abn_score(metric_data.get('score')), metric_labels=metric_data.get('label', {}), desc=metric_data.get('description', '') ) diff --git a/cause_inference/arangodb.py b/cause_inference/arangodb.py index 424e500..d7982b7 100644 --- a/cause_inference/arangodb.py +++ b/cause_inference/arangodb.py @@ -22,6 +22,14 @@ def connect_to_arangodb(arango_url, db_name): return conn.databases[db_name] +def query_all(db, aql_query, bind_vars=None, raw_results=True): + res = [] + query_hdl = db.AQLQuery(aql_query, bindVars=bind_vars, rawResults=raw_results) + for item in query_hdl: + res.append(item) + return res + + def query_recent_topo_ts(db: Database, ts): bind_vars = {'@collection': _TIMESTAMP_COLL_NAME, 'ts': ts} aql_query = ''' @@ -32,12 +40,12 @@ def query_recent_topo_ts(db: Database, ts): RETURN t._key ''' try: - query_res = db.AQLQuery(aql_query, bindVars=bind_vars, rawResults=True).response + query_res = query_all(db, aql_query, bind_vars) except AQLQueryError as ex: raise DBException(ex) from ex - if query_res.get('error') or not query_res.get('result'): + if len(query_res) == 0: raise DBException('Can not find topological graph at the abnormal timestamp {}'.format(ts)) - last_ts = query_res.get('result')[0] + last_ts = query_res[0] return int(last_ts) @@ -56,12 +64,12 @@ def query_topo_entities(db: Database, ts, query_options=None): return v '''.format(filter_str) try: - query_res = db.AQLQuery(aql_query, bindVars=bind_vars, rawResults=True).response + query_res = query_all(db, aql_query, bind_vars) except AQLQueryError as ex: raise DBException(ex) from ex - if query_res.get('error') or not query_res.get('result'): + if len(query_res) == 0: raise DBException('Can not find observe entities satisfied.') - return query_res.get('result') + return query_res def query_subgraph(db, ts, start_node_id, edge_collection, depth=1): @@ -83,12 +91,12 @@ def query_subgraph(db, ts, start_node_id, edge_collection, depth=1): return {{"vertex": v, "edge": e}} '''.format(edge_coll_str) try: - query_res = db.AQLQuery(aql_query, bindVars=bind_vars, rawResults=True).response + query_res = query_all(db, aql_query, bind_vars) except AQLQueryError as ex: raise DBException(ex) from ex vertices = {} edges = {} - for item in query_res.get('result'): + for item in query_res: vertex = item.get('vertex') edge = item.get('edge') vertices.setdefault(vertex.get('_id'), vertex) diff --git a/cause_inference/cause_infer.py b/cause_inference/cause_infer.py index 1954abf..dff26d0 100644 --- a/cause_inference/cause_infer.py +++ b/cause_inference/cause_infer.py @@ -23,8 +23,8 @@ from cause_inference.infer_policy import InferPolicy from cause_inference.infer_policy import get_infer_policy -def normalize_abn_score(score): - return expit(score) +def preprocess_abn_score(score): + return max(0, score) # 因果推理 @@ -172,9 +172,10 @@ def cause_locating(abnormal_kpi: AbnormalEvent, abnormal_metrics: List[AbnormalE # 5. 生成异常指标之间的因果图 causal_graph.init_metric_cause_graph() - logger.logger.debug("Causal graph nodes are: {}".format(causal_graph.entity_cause_graph.nodes)) - logger.logger.debug("Causal graph predecessors: {}".format(causal_graph.entity_cause_graph.pred)) - logger.logger.debug("Causal graph successors: {}".format(causal_graph.entity_cause_graph.succ)) + logger.logger.debug("Entity cause graph nodes are: {}".format(causal_graph.entity_cause_graph.nodes)) + logger.logger.debug("Entity cause graph edges are: {}".format(causal_graph.entity_cause_graph.edges)) + logger.logger.debug("Metric cause graph nodes are: {}".format(causal_graph.metric_cause_graph.nodes)) + logger.logger.debug("Metric cause graph edges are: {}".format(causal_graph.metric_cause_graph.edges)) # 6. 以故障传播图 + 异常KPI为输入,执行根因推导算法,输出 topK 根因指标 infer_policy = get_infer_policy(infer_config.infer_conf.get('infer_policy')) diff --git a/cause_inference/infer_policy.py b/cause_inference/infer_policy.py index c0952e7..0663e74 100644 --- a/cause_inference/infer_policy.py +++ b/cause_inference/infer_policy.py @@ -5,9 +5,10 @@ from typing import List import networkx as nx -from cause_inference.cause_infer import CausalGraph -from cause_inference.cause_infer import Cause +from cause_inference.model import CausalGraph +from cause_inference.model import Cause from cause_inference.exceptions import InferenceException +from cause_inference import rule_parser class InferPolicy(ABC): @@ -114,22 +115,28 @@ class DfsPolicy(InferPolicy): if length < 1: return 0.0 total_score = 0.0 + num_of_valid_node = 0 for node in path[:length]: + if is_virtual_node(node): + continue total_score += cause_graph.nodes[node].get('abnormal_score', 0) - if length != 0: - total_score /= length + num_of_valid_node += 1 + if num_of_valid_node != 0: + total_score /= num_of_valid_node return total_score - def infer(self, causal_graph: CausalGraph, top_k: int) -> List[Cause]: - cause_graph = causal_graph.metric_cause_graph - abn_node_id = (causal_graph.entity_id_of_abn_kpi, causal_graph.abnormal_kpi.abnormal_metric_id) - - reverse_graph = nx.DiGraph() - reverse_graph.add_nodes_from(cause_graph.nodes) + @staticmethod + def reverse_graph(cause_graph): + reversed_graph = nx.DiGraph() + reversed_graph.add_nodes_from(cause_graph.nodes) for from_, to in cause_graph.edges: - reverse_graph.add_edge(to, from_) + reversed_graph.add_edge(to, from_) + return reversed_graph - successors = nx.dfs_successors(reverse_graph, abn_node_id) + @staticmethod + def get_all_paths_to_abn_node(abn_node_id, cause_graph): + reversed_graph = DfsPolicy.reverse_graph(cause_graph) + successors = nx.dfs_successors(reversed_graph, abn_node_id) paths = [] path = [] @@ -145,24 +152,63 @@ class DfsPolicy(InferPolicy): path.append(abn_node_id) dfs_path(abn_node_id) + return paths + @staticmethod + def get_scored_paths(cause_graph, paths) -> list: scored_paths = [] - for p in paths: + for path in paths: scored_paths.append({ - 'score': self.calc_path_score(p, cause_graph), - 'path': p + 'score': DfsPolicy.calc_path_score(path, cause_graph), + 'path': path }) - scored_paths = sorted(scored_paths, key=lambda k: k['score'], reverse=True) - scored_paths = scored_paths[:top_k] + return scored_paths + + @staticmethod + def get_top_paths(scored_paths, top_k) -> list: + top_paths = [] + node_selected = set() + metric_selected = set() + for path in scored_paths: + if len(top_paths) == top_k: + break + cause_node_id = path.get('path')[0] + if cause_node_id in node_selected: + continue + if cause_node_id[1] in metric_selected: + continue + if is_virtual_node(cause_node_id): + continue + node_selected.add(cause_node_id) + metric_selected.add(cause_node_id[1]) + top_paths.append(path) + + return top_paths + @staticmethod + def get_top_causes(top_paths) -> List[Cause]: res = [] - for item in scored_paths: + for item in top_paths: cause_node_id = item.get('path')[0] cause = Cause(cause_node_id[1], cause_node_id[0], item.get('score'), item.get('path')) res.append(cause) - return res + def infer(self, causal_graph: CausalGraph, top_k: int) -> List[Cause]: + cause_graph = causal_graph.metric_cause_graph + abn_node_id = (causal_graph.entity_id_of_abn_kpi, causal_graph.abnormal_kpi.abnormal_metric_id) + + paths = self.get_all_paths_to_abn_node(abn_node_id, cause_graph) + scored_paths = self.get_scored_paths(cause_graph, paths) + scored_paths = sorted(scored_paths, key=lambda k: k['score'], reverse=True) + top_paths = self.get_top_paths(scored_paths, top_k) + + return self.get_top_causes(top_paths) + + +def is_virtual_node(node_id) -> bool: + return rule_parser.is_virtual_metric(node_id[1]) + def get_infer_policy(policy: str, **options) -> InferPolicy: if policy == 'dfs': diff --git a/cause_inference/model.py b/cause_inference/model.py index eb473da..136e4c3 100644 --- a/cause_inference/model.py +++ b/cause_inference/model.py @@ -7,6 +7,7 @@ from spider.conf.observe_meta import ObserveMetaMgt from spider.util import logger from spider.util.entity import concate_entity_id from spider.util.entity import escape_entity_id +from cause_inference import rule_parser class AbnormalEvent: @@ -77,6 +78,10 @@ class CausalGraph: self.metric_cause_graph = nx.DiGraph() self.init_casual_graph() + @staticmethod + def is_virtual_metric_group(metric_group) -> bool: + return len(metric_group) == 1 and rule_parser.is_virtual_metric(metric_group[0]) + def init_casual_graph(self): for node_id, node_attrs in self.topo_nodes.items(): self.entity_cause_graph.add_node(node_id, **node_attrs) @@ -134,19 +139,54 @@ class CausalGraph: self.init_metric_edge(edge) def init_metric_edge(self, entity_edge): - from_entity_id = entity_edge[0] - to_entity_id = entity_edge[1] + f_entity_id = entity_edge[0] + t_entity_id = entity_edge[1] + avail_relations = self.get_avail_metric_causal_relations(entity_edge) + + unique = set() + for f_metric_group, t_metric_group in avail_relations: + if self.is_virtual_metric_group(f_metric_group): + self.add_virtual_metric_node(f_entity_id, f_metric_group[0]) + if self.is_virtual_metric_group(t_metric_group): + self.add_virtual_metric_node(t_entity_id, t_metric_group[0]) + + f_metric_id = self.metric_with_largest_abn_score(f_metric_group, f_entity_id) + t_metric_id = self.metric_with_largest_abn_score(t_metric_group, t_entity_id) + if (f_metric_id, t_metric_id) not in unique: + self.metric_cause_graph.add_edge((f_entity_id, f_metric_id), (t_entity_id, t_metric_id)) + unique.add((f_metric_id, t_metric_id)) + + def add_virtual_metric_node(self, entity_id, metric_id): + self.metric_cause_graph.add_node((entity_id, metric_id)) + + def get_avail_metric_causal_relations(self, entity_edge): + f_metric_ids = self.get_abn_metric_ids(entity_edge[0]) + t_metric_ids = self.get_abn_metric_ids(entity_edge[1]) rule_meta = self.entity_cause_graph.edges[entity_edge].get('rule_meta') - for from_metric_evt in self.get_abnormal_metrics_of_node(from_entity_id): - for to_metric_evt in self.get_abnormal_metrics_of_node(to_entity_id): - from_metric_id = from_metric_evt.abnormal_metric_id - to_metric_id = to_metric_evt.abnormal_metric_id - if rule_meta is not None and not rule_meta.check_metric_pair(from_metric_id, to_metric_id): - continue - self.metric_cause_graph.add_edge( - (from_entity_id, from_metric_id), - (to_entity_id, to_metric_id) - ) + return rule_meta.get_avail_causal_relations(f_metric_ids, t_metric_ids) + + def get_abn_metric_ids(self, entity_id): + metric_evts = self.get_abnormal_metrics_of_node(entity_id) + return [evt.abnormal_metric_id for evt in metric_evts] + + def metric_with_largest_abn_score(self, metric_group: list, entity_id) -> str: + if len(metric_group) == 1: + return metric_group[0] + + metric_evt_map = {} + metric_evts = self.get_abnormal_metrics_of_node(entity_id) + for metric_evt in metric_evts: + metric_evt_map.setdefault(metric_evt.abnormal_metric_id, metric_evt) + + metric_id_of_largest = metric_group[0] + largest_abn_score = metric_evt_map.get(metric_id_of_largest).abnormal_score + for metric_id in metric_group: + abn_score = metric_evt_map.get(metric_id).abnormal_score + if abn_score > largest_abn_score: + metric_id_of_largest = metric_id + largest_abn_score = abn_score + + return metric_id_of_largest class Cause: diff --git a/cause_inference/rule_parser.py b/cause_inference/rule_parser.py index f08ed3c..eb6227c 100644 --- a/cause_inference/rule_parser.py +++ b/cause_inference/rule_parser.py @@ -1,7 +1,7 @@ import os from abc import ABCMeta from abc import abstractmethod -from typing import List +from typing import List, Dict, Tuple import yaml @@ -9,33 +9,86 @@ from spider.conf.observe_meta import RelationType from spider.conf.observe_meta import EntityType from spider.util import logger +METRIC_CATEGORY_ALL = 'ALL' +METRIC_CATEGORY_OTHER = 'OTHER' +METRIC_CATEGORY_VIRTUAL = 'VIRTUAL' +METRIC_ID_OF_CATEGORY_VIRTUAL = 'virtual_metric' -class MetricPairSet: - def __init__(self, from_: set, to_: set): + +def is_virtual_metric(metric_id: str) -> bool: + return metric_id == METRIC_ID_OF_CATEGORY_VIRTUAL + + +class MetricCategoryPair: + def __init__(self, from_: str, to_: str): self.from_ = from_ self.to_ = to_ - def check_metric_pair(self, from_metric_id: str, to_metric_id: str) -> bool: - if self.from_ and from_metric_id not in self.from_: - return False - if self.to_ and to_metric_id not in self.to_: - return False - return True - class RuleMeta: - def __init__(self, from_type, to_type, metric_range=None): + def __init__(self, from_type, to_type, from_categories=None, to_categories=None, metric_range=None): self.from_type = from_type self.to_type = to_type - self.metric_range: List[MetricPairSet] = metric_range or [] - - def check_metric_pair(self, from_metric_id: str, to_metric_id: str) -> bool: - if not self.metric_range: - return True - for item in self.metric_range: - if item.check_metric_pair(from_metric_id, to_metric_id): - return True - return False + self.from_categories = from_categories or {} + self.to_categories = to_categories or {} + self.category_pairs: List[MetricCategoryPair] = metric_range or [] + + @staticmethod + def aggregate_metric_from_groups(category_type, metric_groups) -> List[list]: + res = [] + if category_type == METRIC_CATEGORY_ALL: + for cate_type, metric_group in metric_groups.items(): + if cate_type == METRIC_CATEGORY_VIRTUAL: + continue + elif cate_type == METRIC_CATEGORY_OTHER: + res.extend([metric] for metric in metric_group) + else: + res.append(metric_group) + else: + metric_group = metric_groups.get(category_type) + if metric_group: + res.append(metric_group) + + return res + + @staticmethod + def _group_metric_by_category(metrics, categories) -> Dict[str, list]: + parts = {} + parted_metrics = set() + for cate_type, cate_metrics in categories.items(): + part = [] + for metric in metrics: + if metric in cate_metrics: + part.append(metric) + parted_metrics.add(metric) + if len(part) > 0: + parts.setdefault(cate_type, part) + + other_part = [] + for metric in metrics: + if metric not in parted_metrics: + other_part.append(metric) + if len(other_part) > 0: + parts.setdefault(METRIC_CATEGORY_OTHER, other_part) + + virtual_part = [METRIC_ID_OF_CATEGORY_VIRTUAL] + parts.setdefault(METRIC_CATEGORY_VIRTUAL, virtual_part) + + return parts + + def get_avail_causal_relations(self, real_from_metrics, real_to_metrics) -> List[Tuple[list, list]]: + causal_relations = [] + + from_groups = self._group_metric_by_category(real_from_metrics, self.from_categories) + to_groups = self._group_metric_by_category(real_to_metrics, self.to_categories) + for cate_pair in self.category_pairs: + all_from_metrics = self.aggregate_metric_from_groups(cate_pair.from_, from_groups) + all_to_metrics = self.aggregate_metric_from_groups(cate_pair.to_, to_groups) + for from_metrics in all_from_metrics: + for to_metrics in all_to_metrics: + causal_relations.append((from_metrics, to_metrics)) + + return causal_relations class Rule(metaclass=ABCMeta): @@ -180,7 +233,8 @@ class NicRule1(Rule): class RuleEngine: def __init__(self): self.rules: List[Rule] = [] - self.rule_metas = {} + self.metric_categories = {} + self.rule_metas: Dict[tuple, RuleMeta] = {} def add_rule(self, rule: Rule): self.rules.append(rule) @@ -190,33 +244,58 @@ class RuleEngine: rule.rule_parsing(causal_graph) def load_rule_meta_from_yaml(self, rule_path: str) -> bool: - abs_rule_path = os.path.abspath(rule_path) - if not os.path.exists(abs_rule_path): - logger.logger.warning("Rule meta path '{}' not exist", abs_rule_path) - return True try: - with open(abs_rule_path, 'r') as file: + with open(os.path.abspath(rule_path), 'r') as file: data = yaml.safe_load(file) except IOError as ex: logger.logger.warning(ex) return False - infer_rules = data.get("infer_rules", []) - for rule_meta in infer_rules: - saved_metric_range = [] - for item in rule_meta.get("metric_range", []): - saved_metric_range.append(MetricPairSet(set(item.get('from', [])), set(item.get('to', [])))) - saved_rule_meta = RuleMeta(rule_meta.get('from_type'), rule_meta.get('to_type'), saved_metric_range) - self.rule_metas.setdefault((rule_meta.get("from_type"), rule_meta.get("to_type")), saved_rule_meta) - + self.load_rule_meta_from_dict(data) return True + def create_default_rule_meta(self, from_type, to_type): + return RuleMeta( + from_type, + to_type, + self.metric_categories.get(from_type), + self.metric_categories.get(to_type), + [MetricCategoryPair(METRIC_CATEGORY_ALL, METRIC_CATEGORY_ALL)] + ) + def add_rule_meta(self, causal_graph): entity_cause_graph = causal_graph.entity_cause_graph for edge in entity_cause_graph.edges: from_type = entity_cause_graph.nodes[edge[0]].get('type') to_type = entity_cause_graph.nodes[edge[1]].get('type') - entity_cause_graph.edges[edge]["rule_meta"] = self.rule_metas.get((from_type, to_type)) + rule_meta = self.rule_metas.get((from_type, to_type)) + if not rule_meta: + rule_meta = self.create_default_rule_meta(from_type, to_type) + entity_cause_graph.edges[edge]["rule_meta"] = rule_meta + + def load_rule_meta_from_dict(self, data: dict): + self.load_metric_categories(data.get('metric_categories', {})) + self.load_infer_rules(data.get("infer_rules", [])) + + def load_metric_categories(self, metric_categories: dict): + for entity_type, categories in metric_categories.items(): + category_dict = {} + for category in categories: + category_dict.setdefault(category.get('category'), category.get('metrics')) + self.metric_categories.setdefault(entity_type, category_dict) + + def load_infer_rules(self, infer_rules: list): + for rule_meta in infer_rules: + from_entity_type = rule_meta.get('from_type') + to_entity_type = rule_meta.get('to_type') + saved_metric_range = [] + for item in rule_meta.get("metric_range", []): + from_category = item.get('from') + to_category = item.get('to') + saved_metric_range.append(MetricCategoryPair(from_category, to_category)) + saved_rule_meta = RuleMeta(from_entity_type, to_entity_type, self.metric_categories.get(from_entity_type), + self.metric_categories.get(to_entity_type), saved_metric_range) + self.rule_metas.setdefault((from_entity_type, to_entity_type), saved_rule_meta) rule_engine = RuleEngine() diff --git a/config/infer-rule.yaml b/config/infer-rule.yaml index c3bc3f3..d287972 100644 --- a/config/infer-rule.yaml +++ b/config/infer-rule.yaml @@ -1,32 +1,98 @@ +metric_categories: + proc: + - + category: PROC_CPU + metrics: + - gala_gopher_proc_utime_jiffies + - gala_gopher_proc_stime_jiffies + - + category: PROC_IO_LOAD + metrics: + - gala_gopher_proc_read_bytes + - gala_gopher_proc_write_bytes + - gala_gopher_proc_less_4k_io_read + - gala_gopher_proc_less_4k_io_write + - gala_gopher_proc_greater_4k_io_read + - gala_gopher_proc_greater_4k_io_write + disk: + - + category: DISK_LOAD + metrics: + - gala_gopher_disk_rspeed_kB + - gala_gopher_disk_wspeed_kB + - gala_gopher_disk_rspeed + - gala_gopher_disk_wspeed + - + category: DISK_DELAY + metrics: + - gala_gopher_disk_r_await + - gala_gopher_disk_w_await + - gala_gopher_disk_rareq + - gala_gopher_disk_wareq + block: + - + category: BLOCK_DELAY + metrics: + - gala_gopher_block_latency_req_max + - gala_gopher_block_latency_req_last + - gala_gopher_block_latency_req_sum + - gala_gopher_block_latency_req_jitter + nic: + - + category: NIC_DROP + metrics: + - gala_gopher_nic_tc_sent_drop + - gala_gopher_nic_tx_dropped + - gala_gopher_nic_rx_dropped + cpu: + - + category: CPU_TOTAL + metrics: + - gala_gopher_cpu_total_used_per + infer_rules: - from_type: cpu to_type: proc metric_range: - - from: [] - to: - - gala_gopher_proc_utime_jiffies - - gala_gopher_proc_stime_jiffies + from: CPU_TOTAL + to: PROC_CPU - from_type: block to_type: proc metric_range: - - from: [] - to: - - gala_gopher_proc_ns_ext4_read - - gala_gopher_proc_ns_ext4_write - - gala_gopher_proc_ns_ext4_flush - - gala_gopher_proc_ns_overlay_read - - gala_gopher_proc_ns_overlay_write - - gala_gopher_proc_ns_overlay_flush + from: BLOCK_DELAY + to: VIRTUAL - from_type: proc to_type: disk metric_range: - - from: - - gala_gopher_proc_read_bytes - - gala_gopher_proc_write_bytes - to: [] \ No newline at end of file + from: PROC_IO_LOAD + to: ALL + - + from_type: disk + to_type: block + metric_range: + - + from: DISK_DELAY + to: BLOCK_DELAY + - + from_type: proc + to_type: sli + metric_range: + - + from: VIRTUAL + to: ALL + - + from: PROC_CPU + to: ALL + - + from_type: tcp_link + to_type: proc + metric_range: + - + from: ALL + to: VIRTUAL \ No newline at end of file -- 2.21.0.windows.1