From 0b2243b61fe5083784e634db0d97c2888330eb0a Mon Sep 17 00:00:00 2001 From: lizhenxing11 Date: Mon, 14 Nov 2022 19:43:17 +0800 Subject: [PATCH 1/2] Update sys io latency detector model Add tcp detector and update model fix code check issue --- anteater/core/anomaly.py | 8 ++ anteater/core/kpi.py | 2 +- anteater/model/three_sigma.py | 37 ++++++ anteater/module/app_sli_detector.py | 93 ++++++++------- anteater/module/detector.py | 54 +++++++-- anteater/module/proc_io_latency_detector.py | 49 +++++--- anteater/module/sys_io_latency_detector.py | 109 +++++++++--------- anteater/module/sys_tcp_establish_detector.py | 108 +++++++++-------- .../module/sys_tcp_transmission_detector.py | 76 ++++++------ anteater/provider/base.py | 14 ++- anteater/source/anomaly_report.py | 1 + anteater/template/template.py | 2 +- anteater/utils/common.py | 12 ++ config/gala-anteater.yaml | 6 +- config/module/app_sli_rtt.json | 15 ++- config/module/proc_io_latency.json | 14 ++- config/module/sys_io_latency.json | 9 +- config/module/sys_tcp_establish.json | 5 +- config/module/sys_tcp_transmission.json | 33 +++++- 19 files changed, 416 insertions(+), 231 deletions(-) create mode 100644 anteater/model/three_sigma.py diff --git a/anteater/core/anomaly.py b/anteater/core/anomaly.py index cce7767..b95eeab 100644 --- a/anteater/core/anomaly.py +++ b/anteater/core/anomaly.py @@ -13,6 +13,8 @@ from dataclasses import dataclass +from anteater.utils.time_series import TimeSeries + @dataclass class Anomaly: @@ -21,3 +23,9 @@ class Anomaly: score: float = None entity_name: str = None description: str = None + + +@dataclass +class CauseMetric: + ts: TimeSeries + score: float diff --git a/anteater/core/kpi.py b/anteater/core/kpi.py index db4c046..620ffdf 100644 --- a/anteater/core/kpi.py +++ b/anteater/core/kpi.py @@ -21,5 +21,5 @@ class KPI: entity_name: str = None enable: bool = False description: str = "" - parameter: dict = field(default=dict) + params: dict = field(default=dict) diff --git a/anteater/model/three_sigma.py b/anteater/model/three_sigma.py new file mode 100644 index 0000000..08d05ba --- /dev/null +++ b/anteater/model/three_sigma.py @@ -0,0 +1,37 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) 2022 Huawei Technologies Co., Ltd. +# gala-anteater is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ + + +import numpy as np + + +def three_sigma(values, obs_size, method="abs"): + """The '3-sigma rule' outlier detect function""" + if obs_size <= 0: + raise ValueError("The obs_size should great than zero!") + if len(values) <= obs_size: + raise ValueError("The obs_size should be great than values' length") + train_val = values[:-obs_size] + obs_val = values[-obs_size:] + mean = np.mean(train_val) + std = np.std(train_val) + if method == "abs": + outlier = [val for val in obs_val if abs(val - mean) > 3 * std] + elif method == 'min': + outlier = [val for val in obs_val if val < mean - 3 * std] + elif method == 'max': + outlier = [val for val in obs_val if val > mean + 3 * std] + else: + raise ValueError(f'Unknown method {method}') + + return outlier, mean, std diff --git a/anteater/module/app_sli_detector.py b/anteater/module/app_sli_detector.py index e38d53e..b69f73c 100644 --- a/anteater/module/app_sli_detector.py +++ b/anteater/module/app_sli_detector.py @@ -19,17 +19,15 @@ Description: The anomaly detector implementation on APP Sli import math from typing import List -import numpy as np - from anteater.core.anomaly import Anomaly from anteater.model.algorithms.spectral_residual import SpectralResidual -from anteater.model.slope import smooth_slope from anteater.model.smoother import conv_smooth +from anteater.model.three_sigma import three_sigma from anteater.module.detector import Detector from anteater.source.anomaly_report import AnomalyReport from anteater.source.metric_loader import MetricLoader from anteater.template.app_anomaly_template import AppAnomalyTemplate -from anteater.utils.data_load import load_kpi_feature +from anteater.utils.common import divide from anteater.utils.datetime import DateTimeManager as dt from anteater.utils.log import logger @@ -40,48 +38,52 @@ class APPSliDetector(Detector): """ def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): - super().__init__(data_loader, anomaly_report) - self.kpis, self.features = load_kpi_feature('app_sli_rtt.json') + file_name = 'app_sli_rtt.json' + super().__init__(data_loader, anomaly_report, file_name) def execute_detect(self, machine_id: str): for kpi in self.kpis: - parameter = kpi.parameter if kpi.kpi_type == 'rtt': - anomalies = self.detect_rtt(kpi, machine_id, parameter) + anomalies = self.detect_rtt(kpi, machine_id) else: - anomalies = self.detect_tps(kpi, machine_id, parameter) + anomalies = self.detect_tps(kpi, machine_id) for anomaly in anomalies: - self.report(anomaly, kpi.entity_name, machine_id) + self.report(anomaly, machine_id) - def detect_rtt(self, kpi, machine_id: str, parameter: dict) -> List[Anomaly]: + def detect_rtt(self, kpi, machine_id: str) -> List[Anomaly]: """Detects rtt by rule-based model""" - start, end = dt.last(minutes=10) - time_series_list = self.data_loader.get_metric( + look_back = kpi.params.get('look_back', None) + box_pts = kpi.params.get('box_pts', None) + obs_size = kpi.params.get('obs_size', None) + outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) + + start, end = dt.last(minutes=look_back) + ts_list = self.data_loader.get_metric( start, end, kpi.metric, label_name='machine_id', label_value=machine_id) - if not time_series_list: + if not ts_list: logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') return [] point_count = self.data_loader.expected_point_length(start, end) anomalies = [] - threshold = parameter['threshold'] - min_nsec = parameter['min_nsec'] - for time_series in time_series_list: - if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: + for _ts in ts_list: + if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: continue - score = max(smooth_slope(time_series, windows_length=13)) - if math.isnan(score) or math.isinf(score): - continue + smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) + outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') + ratio = divide(len(outlier), obs_size) - avg_nsec = np.mean(time_series.values[-13:]) - if score >= threshold and avg_nsec >= min_nsec: + if outlier and ratio >= outlier_ratio_th: + logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' + f'Mean: {mean}, Std: {std}') anomalies.append( - Anomaly(metric=time_series.metric, - labels=time_series.labels, - score=score, + Anomaly(metric=_ts.metric, + labels=_ts.labels, + score=ratio, + entity_name=kpi.entity_name, description=kpi.description)) anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) @@ -91,9 +93,14 @@ class APPSliDetector(Detector): return anomalies - def detect_tps(self, kpi, machine_id: str, parameter: dict) -> List[Anomaly]: + def detect_tps(self, kpi, machine_id: str) -> List[Anomaly]: """Detects tps by rule based model""" - start, end = dt.last(minutes=10) + look_back = kpi.params.get('look_back', None) + box_pts = kpi.params.get('box_pts', None) + obs_size = kpi.params.get('obs_size', None) + outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) + + start, end = dt.last(minutes=look_back) time_series_list = self.data_loader.get_metric( start, end, kpi.metric, label_name='machine_id', label_value=machine_id) @@ -103,21 +110,21 @@ class APPSliDetector(Detector): point_count = self.data_loader.expected_point_length(start, end) anomalies = [] - threshold = parameter['threshold'] - for time_series in time_series_list: - if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: + for _ts in time_series_list: + if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: continue - pre_values = time_series.values[:-25] - cur_values = time_series.values[-25:] - mean = np.mean(pre_values) - std = np.std(pre_values) - outlier = [val for val in cur_values if val < mean - 3 * std] + smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) + outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') + ratio = divide(len(outlier), obs_size) - if outlier and len(outlier) >= len(cur_values) * 0.3: + if outlier and ratio >= outlier_ratio_th: + logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' + f'Mean: {mean}, Std: {std}') anomalies.append( - Anomaly(metric=time_series.metric, - labels=time_series.labels, - score=1, + Anomaly(metric=_ts.metric, + labels=_ts.labels, + score=ratio, + entity_name=kpi.entity_name, description=kpi.description)) anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) @@ -161,7 +168,8 @@ class APPSliDetector(Detector): return result[0: top_n] - def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): + def report(self, anomaly: Anomaly, machine_id: str): + """Reports a single anomaly at each time""" feature_metrics = [f.metric for f in self.features] description = {f.metric: f.description for f in self.features} cause_metrics = self.detect_features(feature_metrics, machine_id, top_n=60) @@ -172,6 +180,5 @@ class APPSliDetector(Detector): 'description': description.get(cause[0].metric, '')} for cause in cause_metrics] timestamp = dt.utc_now() - template = AppAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) - template.labels = anomaly.labels + template = AppAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) diff --git a/anteater/module/detector.py b/anteater/module/detector.py index 51dabbd..bfe516e 100644 --- a/anteater/module/detector.py +++ b/anteater/module/detector.py @@ -11,37 +11,69 @@ # See the Mulan PSL v2 for more details. # ******************************************************************************/ -from abc import abstractmethod +from abc import abstractmethod, ABC from anteater.core.anomaly import Anomaly from anteater.source.anomaly_report import AnomalyReport from anteater.source.metric_loader import MetricLoader +from anteater.utils.common import same_intersection_key_value +from anteater.utils.data_load import load_kpi_feature from anteater.utils.datetime import DateTimeManager as dt from anteater.utils.log import logger from anteater.utils.timer import timer -class Detector: - """The base detector class""" - def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): +class Detector(ABC): + """The anomaly detector base class""" + def __init__( + self, + data_loader: MetricLoader, + anomaly_report: AnomalyReport, + file_name: str): + """The detector base class initializer""" self.data_loader = data_loader self.anomaly_report = anomaly_report + self.kpis, self.features = load_kpi_feature(file_name) + + @staticmethod + def filter_ts(ts_list, filters): + result = [] + for _ts in ts_list: + if same_intersection_key_value(_ts.labels, filters): + result.append(_ts) + + return result @abstractmethod def execute_detect(self, machine_id): + """Executes anomaly detection on specified machine id""" + pass + + @abstractmethod + def report(self, anomaly: Anomaly, machine_id: str): + """Reports a single anomaly at each time""" pass @timer def detect(self): + """The main function of detector""" + if not self.kpis: + logger.debug(f"Null kpis in detector: {self.__class__.__name__}!") + return + logger.info(f"Run detector: {self.__class__.__name__}!") - start, end = dt.last(minutes=1) - metrics_kpi = [k.metric for k in self.kpis] - metrics_feat = [f.metric for f in self.features] - metrics = metrics_kpi + metrics_feat - machine_ids = self.data_loader.get_unique_machines(start, end, metrics) + self.pre_process() + machine_ids = self.get_unique_machine_id() for _id in machine_ids: self.execute_detect(_id) - @abstractmethod - def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): + def pre_process(self): + """Executes pre-process for generating necessary parameters""" pass + + def get_unique_machine_id(self): + """Gets unique machine ids during past minutes""" + start, end = dt.last(minutes=1) + metrics = [_kpi.metric for _kpi in self.kpis] + machine_ids = self.data_loader.get_unique_machines(start, end, metrics) + return machine_ids diff --git a/anteater/module/proc_io_latency_detector.py b/anteater/module/proc_io_latency_detector.py index ee1d7c6..3ea2c51 100644 --- a/anteater/module/proc_io_latency_detector.py +++ b/anteater/module/proc_io_latency_detector.py @@ -16,11 +16,13 @@ import math from anteater.core.anomaly import Anomaly from anteater.model.algorithms.spectral_residual import SpectralResidual from anteater.model.slope import smooth_slope +from anteater.model.smoother import conv_smooth +from anteater.model.three_sigma import three_sigma from anteater.module.detector import Detector from anteater.source.anomaly_report import AnomalyReport from anteater.source.metric_loader import MetricLoader from anteater.template.sys_anomaly_template import SysAnomalyTemplate -from anteater.utils.data_load import load_kpi_feature +from anteater.utils.common import divide from anteater.utils.datetime import DateTimeManager as dt from anteater.utils.log import logger @@ -31,40 +33,50 @@ class ProcIOLatencyDetector(Detector): """ def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): - super().__init__(data_loader, anomaly_report) - self.kpis, self.features = load_kpi_feature('proc_io_latency.json') + file_name = 'proc_io_latency.json' + super().__init__(data_loader, anomaly_report, file_name) def execute_detect(self, machine_id): for kpi in self.kpis: - parameter = kpi.parameter - start, end = dt.last(minutes=10) - time_series_list = self.data_loader.get_metric( + look_back = kpi.params.get('look_back', None) + box_pts = kpi.params.get('box_pts', None) + obs_size = kpi.params.get('obs_size', None) + outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) + + start, end = dt.last(minutes=look_back) + ts_list = self.data_loader.get_metric( start, end, kpi.metric, label_name='machine_id', label_value=machine_id) - if not time_series_list: + if not ts_list: logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') return point_count = self.data_loader.expected_point_length(start, end) anomalies = [] - threshold = parameter['threshold'] - for time_series in time_series_list: - if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: + for _ts in ts_list: + if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: continue - if sum(time_series.values) == 0: + if sum(_ts.values) == 0: continue - score = max(smooth_slope(time_series, windows_length=13)) + score = max(smooth_slope(_ts, windows_length=13)) if math.isnan(score) or math.isinf(score): continue - if score > threshold: + smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) + outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') + ratio = divide(len(outlier), obs_size) + + if outlier and ratio >= outlier_ratio_th: + logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' + f'Mean: {mean}, Std: {std}') anomalies.append( - Anomaly(metric=time_series.metric, - labels=time_series.labels, + Anomaly(metric=_ts.metric, + labels=_ts.labels, score=score, + entity_name=kpi.entity_name, description=kpi.description)) anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) @@ -72,7 +84,7 @@ class ProcIOLatencyDetector(Detector): if anomalies: logger.info('Sys io latency anomalies was detected.') for anomaly in anomalies: - self.report(anomaly, kpi.entity_name, machine_id) + self.report(anomaly, machine_id) def detect_features(self, machine_id: str, top_n=3): priorities = {f.metric: f.priority for f in self.features} @@ -110,7 +122,7 @@ class ProcIOLatencyDetector(Detector): return result - def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): + def report(self, anomaly: Anomaly, machine_id: str): description = {f.metric: f.description for f in self.features} cause_metrics = self.detect_features(machine_id, top_n=3) cause_metrics = [ @@ -123,6 +135,5 @@ class ProcIOLatencyDetector(Detector): cause[0].labels.get('comm', ''))} for cause in cause_metrics] timestamp = dt.utc_now() - template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) - template.labels = anomaly.labels + template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) diff --git a/anteater/module/sys_io_latency_detector.py b/anteater/module/sys_io_latency_detector.py index f459a57..c29ce72 100644 --- a/anteater/module/sys_io_latency_detector.py +++ b/anteater/module/sys_io_latency_detector.py @@ -15,12 +15,13 @@ import math from anteater.core.anomaly import Anomaly from anteater.model.algorithms.spectral_residual import SpectralResidual -from anteater.model.slope import smooth_slope +from anteater.model.smoother import conv_smooth +from anteater.model.three_sigma import three_sigma from anteater.module.detector import Detector from anteater.source.anomaly_report import AnomalyReport from anteater.source.metric_loader import MetricLoader from anteater.template.sys_anomaly_template import SysAnomalyTemplate -from anteater.utils.data_load import load_kpi_feature +from anteater.utils.common import divide, same_intersection_key_value from anteater.utils.datetime import DateTimeManager as dt from anteater.utils.log import logger @@ -31,100 +32,104 @@ class SysIOLatencyDetector(Detector): """ def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): - super().__init__(data_loader, anomaly_report) - self.kpis, self.features = load_kpi_feature('sys_io_latency.json') + """The system i/o latency detector initializer""" + file_name = 'sys_io_latency.json' + super().__init__(data_loader, anomaly_report, file_name) def execute_detect(self, machine_id: str): + """Executes the detector based on machine id""" kpi = self.kpis[0] - parameter = kpi.parameter + look_back = kpi.params.get('look_back', None) + box_pts = kpi.params.get('box_pts', None) + obs_size = kpi.params.get('obs_size', None) + outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) - start, end = dt.last(minutes=10) - time_series_list = self.data_loader.get_metric( - start, end, kpi.metric, label_name='machine_id', label_value=machine_id) + start, end = dt.last(minutes=look_back) + ts_list = self.data_loader.\ + get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) - if not time_series_list: + if not ts_list: logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') return point_count = self.data_loader.expected_point_length(start, end) anomalies = [] - threshold = parameter['threshold'] - for time_series in time_series_list: - if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: + for _ts in ts_list: + if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: continue - if sum(time_series.values) == 0: + if sum(_ts.values) == 0: continue - score = max(smooth_slope(time_series, windows_length=13)) + smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) + outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='max') + ratio = divide(len(outlier), obs_size) - if math.isnan(score) or math.isinf(score): - continue - - if score > threshold: + if outlier and ratio >= outlier_ratio_th: + logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' + f'Mean: {mean}, Std: {std}') anomalies.append( - Anomaly(metric=time_series.metric, - labels=time_series.labels, - score=score, + Anomaly(metric=_ts.metric, + labels=_ts.labels, + score=ratio, + entity_name=kpi.entity_name, description=kpi.description)) - anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) - if anomalies: logger.info('Sys io latency anomalies was detected.') + anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) for anomaly in anomalies: - self.report(anomaly, kpi.entity_name, machine_id) + self.report(anomaly, machine_id) - def detect_features(self, machine_id: str, top_n: int): + def find_cause_metrics(self, machine_id: str, filters: dict, top_n: int): + """Detects the abnormal features and reports the caused metrics""" priorities = {f.metric: f.priority for f in self.features} start, end = dt.last(minutes=6) - time_series_list = [] + ts_list = [] for metric in priorities.keys(): - time_series = self.data_loader.get_metric( + _ts_list = self.data_loader.get_metric( start, end, metric, label_name='machine_id', label_value=machine_id) - time_series_list.extend(time_series) + for _ts in _ts_list: + if same_intersection_key_value(_ts.labels, filters): + ts_list.append(_ts) point_count = self.data_loader.expected_point_length(start, end) - sr_model = SpectralResidual(12, 24, 50) - + model = SpectralResidual(12, 24, 50) result = [] - for time_series in time_series_list: - if len(time_series.values) < point_count * 0.9 or \ - len(time_series.values) > point_count * 1.5: + for _ts in ts_list: + if len(_ts.values) < point_count * 0.9 or \ + len(_ts.values) > point_count * 1.5: continue - values = time_series.values - - if all(x == values[0] for x in values): + if all(x == _ts.values[0] for x in _ts.values): continue - scores = sr_model.compute_score(values) - score = max(scores[-13:]) - + score = max(model.compute_score(_ts.values)[-13:]) if math.isnan(score) or math.isinf(score): continue - result.append((time_series, score)) + result.append((_ts, score)) result = sorted(result, key=lambda x: x[1], reverse=True)[0: top_n] result = sorted(result, key=lambda x: priorities[x[0].metric]) return result - def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): - feature_metrics = [f.metric for f in self.features] + def report(self, anomaly: Anomaly, machine_id: str): + """Reports the anomaly with it's caused metrics""" description = {f.metric: f.description for f in self.features} - cause_metrics = self.detect_features(machine_id, top_n=3) + cause_metrics = self.find_cause_metrics(machine_id, anomaly.labels, top_n=3) cause_metrics = [ - {'metric': cause[0].metric, - 'label': cause[0].labels, - 'score': cause[1], - 'description': description.get(cause[0].metric, '').format( - cause[0].labels.get('disk_name', ''), - cause[0].labels.get('tgid', ''), - cause[0].labels.get('comm', ''))} + { + 'metric': cause[0].metric, + 'label': cause[0].labels, + 'score': cause[1], + 'description': description.get(cause[0].metric, '').format( + cause[0].labels.get('disk_name', ''), + cause[0].labels.get('tgid', ''), + cause[0].labels.get('comm', '')) + } for cause in cause_metrics] timestamp = dt.utc_now() - template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) - template.labels = anomaly.labels + template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) diff --git a/anteater/module/sys_tcp_establish_detector.py b/anteater/module/sys_tcp_establish_detector.py index 4b49b25..3ca61c5 100644 --- a/anteater/module/sys_tcp_establish_detector.py +++ b/anteater/module/sys_tcp_establish_detector.py @@ -11,15 +11,17 @@ # See the Mulan PSL v2 for more details. # ******************************************************************************/ +from functools import reduce +from typing import List + import numpy as np -from anteater.core.anomaly import Anomaly +from anteater.core.anomaly import Anomaly, CauseMetric from anteater.module.detector import Detector from anteater.source.anomaly_report import AnomalyReport from anteater.source.metric_loader import MetricLoader from anteater.template.sys_anomaly_template import SysAnomalyTemplate -from anteater.utils.common import divide -from anteater.utils.data_load import load_kpi_feature +from anteater.utils.common import divide, same_intersection_key_value from anteater.utils.datetime import DateTimeManager as dt from anteater.utils.log import logger @@ -36,69 +38,85 @@ class SysTcpEstablishDetector(Detector): """ def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): - super().__init__(data_loader, anomaly_report) - self.kpis, self.features = load_kpi_feature('sys_tcp_establish.json') + file_name = 'sys_tcp_establish.json' + super().__init__(data_loader, anomaly_report, file_name) - def execute_detect(self, machine_id: str): + self.mean = None + self.std = None + + def pre_process(self): + """Calculates ts values mean and std""" kpi = self.kpis[0] - start_30_minutes, _ = dt.last(minutes=30) - start_3_minutes, end = dt.last(minutes=3) + look_back = kpi.params.get('look_back', None) - pre_ts = self.data_loader.get_metric( - start_30_minutes, start_3_minutes, kpi.metric, label_name='machine_id', label_value=machine_id) - pre_establish_time = [t.values[0] for t in pre_ts if t.values] + start, _ = dt.last(minutes=look_back) + mid, _ = dt.last(minutes=3) - ts = self.data_loader.get_metric( - start_3_minutes, end, kpi.metric, label_name='machine_id', label_value=machine_id) - establish_time = [t.values[0] for t in ts if t.values] + ts_list = self.data_loader.get_metric(start, mid, kpi.metric) + establish_time = reduce(lambda x, y: x + y, [list(set(_ts.values)) for _ts in ts_list]) - mean = np.mean(pre_establish_time) - std = np.std(pre_establish_time) + self.mean = np.mean(establish_time) + self.std = np.std(establish_time) - outlier = [val for val in establish_time if abs(val - mean) > 3 * std] + def execute_detect(self, machine_id: str): + """Executes the detector based on machine id""" + kpi = self.kpis[0] + outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) - if outlier and len(outlier) > len(ts) * 0.3: + start, end = dt.last(minutes=3) + ts_list = self.data_loader. \ + get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) + establish_time = reduce(lambda x, y: x + y, [list(set(_ts.values)) for _ts in ts_list]) + + outlier = [val for val in establish_time if abs(val - self.mean) > 3 * self.std] + ratio = divide(len(outlier), len(establish_time)) + if outlier and ratio > outlier_ratio_th: + logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' + f'Mean: {self.mean}, Std: {self.std}') logger.info('Sys tcp establish anomalies was detected.') - if establish_time: - percentile = divide(len(outlier), len(establish_time)) - else: - percentile = 0 anomaly = Anomaly( metric=kpi.metric, labels={}, - description=kpi.description.format(percentile, min(outlier))) - self.report(anomaly, kpi.entity_name, machine_id) + entity_name=kpi.entity_name, + description=kpi.description.format(ratio, min(outlier))) + self.report(anomaly, machine_id) - def detect_features(self, machine_id: str): + def find_cause_metrics(self, machine_id: str, filters: dict) -> List[CauseMetric]: + """Detects the abnormal features and reports the caused metrics""" + priorities = {f.metric: f.priority for f in self.features} start, end = dt.last(minutes=3) - time_series_list = [] - metrics = [f.metric for f in self.features] - for metric in metrics: - time_series = self.data_loader.get_metric( - start, end, metric, label_name='machine_id', label_value=machine_id) - time_series_list.extend(time_series) + ts_list = [] + for metric in priorities.keys(): + _ts_list = self.data_loader.\ + get_metric(start, end, metric, label_name='machine_id', label_value=machine_id) + filtered_ts_list = self.filter_ts(_ts_list, filters) + ts_list.extend(filtered_ts_list) result = [] - for ts in time_series_list: - if ts.values and max(ts.values) > 0: - result.append((ts, max(ts.values))) + for _ts in ts_list: + if _ts.values and max(_ts.values) > 0: + cause_metric = CauseMetric(ts=_ts, score=max(_ts.values)) + result.append(cause_metric) - result = sorted(result, key=lambda x: x[1], reverse=True) + result = sorted(result, key=lambda x: x.score, reverse=True) return result - def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): + def report(self, anomaly: Anomaly, machine_id: str): + """Reports a single anomaly at each time""" description = {f.metric: f.description for f in self.features} - cause_metrics = self.detect_features(machine_id) + cause_metrics = self.find_cause_metrics(machine_id, anomaly.labels) cause_metrics = [ - {'metric': cause[0].metric, - 'label': cause[0].labels, - 'score': cause[1], - 'description': description.get(cause[0].metric, '').format( - cause[0].labels.get('ppid', ''), - cause[0].labels.get('s_port', ''))} + { + 'metric': cause.ts.metric, + 'label': cause.ts.labels, + 'score': cause.score, + 'description': description.get(cause.ts.metric, '').format( + cause.ts.labels.get('ppid', ''), + cause.ts.labels.get('s_port', '')) + } for cause in cause_metrics] + timestamp = dt.utc_now() - template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) - template.labels = anomaly.labels + template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) diff --git a/anteater/module/sys_tcp_transmission_detector.py b/anteater/module/sys_tcp_transmission_detector.py index 9af4760..ad383b4 100644 --- a/anteater/module/sys_tcp_transmission_detector.py +++ b/anteater/module/sys_tcp_transmission_detector.py @@ -16,11 +16,13 @@ import math from anteater.core.anomaly import Anomaly from anteater.model.algorithms.spectral_residual import SpectralResidual from anteater.model.slope import smooth_slope +from anteater.model.smoother import conv_smooth +from anteater.model.three_sigma import three_sigma from anteater.module.detector import Detector from anteater.source.anomaly_report import AnomalyReport from anteater.source.metric_loader import MetricLoader from anteater.template.sys_anomaly_template import SysAnomalyTemplate -from anteater.utils.data_load import load_kpi_feature +from anteater.utils.common import divide from anteater.utils.datetime import DateTimeManager as dt from anteater.utils.log import logger @@ -31,40 +33,45 @@ class SysTcpTransmissionDetector(Detector): """ def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): - super().__init__(data_loader, anomaly_report) - self.kpis, self.features = load_kpi_feature('sys_tcp_transmission.json') + file_name = 'sys_tcp_transmission.json' + super().__init__(data_loader, anomaly_report, file_name) def execute_detect(self, machine_id: str): for kpi in self.kpis: - parameter = kpi.parameter - start, end = dt.last(minutes=10) - time_series_list = self.data_loader.get_metric( - start, end, kpi.metric, label_name='machine_id', label_value=machine_id) + look_back = kpi.params.get('look_back', None) + box_pts = kpi.params.get('box_pts', None) + obs_size = kpi.params.get('obs_size', None) + outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) - if not time_series_list: + start, end = dt.last(minutes=look_back) + ts_list = self.data_loader.\ + get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) + + if not ts_list: logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') return point_count = self.data_loader.expected_point_length(start, end) anomalies = [] - threshold = parameter['threshold'] - for time_series in time_series_list: - if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: + for _ts in ts_list: + if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: continue - if sum(time_series.values) == 0: + if sum(_ts.values) == 0: continue - score = max(smooth_slope(time_series, windows_length=13)) - - if math.isnan(score) or math.isinf(score): - continue + smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) + outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') + ratio = divide(len(outlier), obs_size) - if score > threshold: + if outlier and ratio >= outlier_ratio_th: + logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' + f'Mean: {mean}, Std: {std}') anomalies.append( - Anomaly(metric=time_series.metric, - labels=time_series.labels, - score=score, + Anomaly(metric=_ts.metric, + labels=_ts.labels, + score=ratio, + entity_name=kpi.entity_name, description=kpi.description)) anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) @@ -72,45 +79,45 @@ class SysTcpTransmissionDetector(Detector): if anomalies: logger.info('Sys io latency anomalies was detected.') for anomaly in anomalies: - self.report(anomaly, kpi.entity_name, machine_id) + self.report(anomaly, machine_id) def detect_features(self, machine_id: str, top_n=3): priorities = {f.metric: f.priority for f in self.features} start, end = dt.last(minutes=6) - time_series_list = [] + ts_list = [] for metric in priorities.keys(): - time_series = self.data_loader.get_metric( - start, end, metric, label_name='machine_id', label_value=machine_id) - time_series_list.extend(time_series) + _ts = self.data_loader.\ + get_metric(start, end, metric, label_name='machine_id', label_value=machine_id) + ts_list.extend(_ts) point_count = self.data_loader.expected_point_length(start, end) - sr_model = SpectralResidual(12, 24, 50) + model = SpectralResidual(12, 24, 50) result = [] - for time_series in time_series_list: - if len(time_series.values) < point_count * 0.9 or \ - len(time_series.values) > point_count * 1.5: + for _ts in ts_list: + if len(_ts.values) < point_count * 0.9 or \ + len(_ts.values) > point_count * 1.5: continue - values = time_series.values + values = _ts.values if all(x == values[0] for x in values): continue - scores = sr_model.compute_score(values) + scores = model.compute_score(values) score = max(scores[-13:]) if math.isnan(score) or math.isinf(score): continue - result.append((time_series, score)) + result.append((_ts, score)) result = sorted(result, key=lambda x: x[1], reverse=True)[0: top_n] result = sorted(result, key=lambda x: priorities[x[0].metric]) return result - def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): + def report(self, anomaly: Anomaly, machine_id: str): description = {f.metric: f.description for f in self.features} cause_metrics = self.detect_features(machine_id, top_n=3) cause_metrics = [ @@ -124,6 +131,5 @@ class SysTcpTransmissionDetector(Detector): cause[0].labels.get('server_port', ''))} for cause in cause_metrics] timestamp = dt.utc_now() - template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) - template.labels = anomaly.labels + template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) diff --git a/anteater/provider/base.py b/anteater/provider/base.py index eda2d1b..35d1d1f 100644 --- a/anteater/provider/base.py +++ b/anteater/provider/base.py @@ -56,16 +56,16 @@ class TimeSeriesProvider: def fetch(url, params: Dict, **args) -> List: """Fetches data from prometheus server by http request""" try: - response = requests.get(url, params, timeout=30, **args).json() + response = requests.get(url, params, timeout=30, **args) except requests.RequestException as e: logger.error(f"RequestException: {e}!") return [] - + response = response.json() result = [] if response and response.get("status") == 'success': result = response.get('data', {}).get('result', []) else: - logger.error(f"PrometheusAdapter get data failed, " + logger.error(f"Prometheus get data failed, " f"error: {response.get('error')}, query_url: {url}, params: {params}.") return result @@ -87,17 +87,19 @@ class TimeSeriesProvider: data = self.fetch(self.url, params, headers=headers) for item in data: - zipped_values = list(zip(*item.get("values"))) + zipped_values = list(zip(*item.get('values'))) time_stamps = list(zipped_values[0]) values = [float(v) for v in zipped_values[1]] - key = tuple(sorted(item.get("metric").items())) + key = tuple(sorted(item.get('metric').items())) if key in tmp_index: result[tmp_index.get(key)].extend(time_stamps, values) else: + labels = item.get('metric') + labels.pop('__name__', None) time_series = TimeSeries( metric, - item.get("metric"), + labels, time_stamps, values) tmp_index[key] = len(result) diff --git a/anteater/source/anomaly_report.py b/anteater/source/anomaly_report.py index 2205f44..41542b7 100644 --- a/anteater/source/anomaly_report.py +++ b/anteater/source/anomaly_report.py @@ -48,6 +48,7 @@ class AnomalyReport: entity_name = template.entity_name labels = anomaly.labels + template.labels = labels template.entity_id = self.get_entity_id(machine_id, entity_name, labels, keys) template.keys = keys template.description = anomaly.description diff --git a/anteater/template/template.py b/anteater/template/template.py index 52befaa..d86a8cb 100644 --- a/anteater/template/template.py +++ b/anteater/template/template.py @@ -21,8 +21,8 @@ class Template: self.machine_id = machine_id self.metric_id = metric_id self.entity_name = entity_name - self.labels = {} + self.labels = {} self.entity_id = "" self.description = "" self.cause_metrics = {} diff --git a/anteater/utils/common.py b/anteater/utils/common.py index d6c80ab..a99a1ef 100644 --- a/anteater/utils/common.py +++ b/anteater/utils/common.py @@ -170,3 +170,15 @@ def divide(x, y): return x / y else: return 0 + + +def same_intersection_key_value(first: dict, second: dict): + """Checks there are same key value pairs between two dictionaries + intersections by the key + """ + same_keys = set(first.keys()) & set(second.keys()) + for key in same_keys: + if first[key] != second[key]: + return False + + return True diff --git a/config/gala-anteater.yaml b/config/gala-anteater.yaml index b771c16..a01eb47 100644 --- a/config/gala-anteater.yaml +++ b/config/gala-anteater.yaml @@ -3,7 +3,7 @@ Global: is_sys: false Kafka: - server: "9.16.143.92" + server: "localhost" port: "9092" model_topic: "gala_anteater_hybrid_model" model_group_id: "gala_anteater_1" @@ -12,8 +12,8 @@ Kafka: meta_entity_name: "sli" Prometheus: - server: "10.137.17.122" - port: "29090" + server: "localhost" + port: "9090" steps: 5 Aom: diff --git a/config/module/app_sli_rtt.json b/config/module/app_sli_rtt.json index 4372f07..0744416 100644 --- a/config/module/app_sli_rtt.json +++ b/config/module/app_sli_rtt.json @@ -6,9 +6,11 @@ "entity_name": "sli", "enable": true, "description": "sli rtt 异常", - "parameter": { - "min_nsec": 200000000, - "threshold": 0.8 + "params": { + "look_back": 10, + "box_pts": 3, + "obs_size": 25, + "outlier_ratio_th": 0.3 } }, { @@ -17,8 +19,11 @@ "entity_name": "sli", "enable": true, "description": "sli tps 异常", - "parameter": { - "threshold": -0.5 + "params": { + "look_back": 10, + "box_pts": 3, + "obs_size": 25, + "outlier_ratio_th": 0.3 } } ], diff --git a/config/module/proc_io_latency.json b/config/module/proc_io_latency.json index 05bcd81..967ee10 100644 --- a/config/module/proc_io_latency.json +++ b/config/module/proc_io_latency.json @@ -6,8 +6,11 @@ "entity_name": "thread", "enable": true, "description": "Process IO_wait time (unit: us)", - "parameter": { - "threshold": 0.5 + "params": { + "look_back": 10, + "box_pts": 3, + "obs_size": 25, + "outlier_ratio_th": 0.3 } }, { @@ -16,8 +19,11 @@ "entity_name": "proc", "enable": true, "description": "I/O operation delay at the BIO layer (unit: us)", - "parameter": { - "threshold": 0.5 + "params": { + "look_back": 10, + "box_pts": 3, + "obs_size": 25, + "outlier_ratio_th": 0.3 } } ], diff --git a/config/module/sys_io_latency.json b/config/module/sys_io_latency.json index 66e5c5a..f790192 100644 --- a/config/module/sys_io_latency.json +++ b/config/module/sys_io_latency.json @@ -5,9 +5,12 @@ "kpi_type": "", "entity_name": "block", "enable": true, - "description": "Block I/O latency performance", - "parameter": { - "threshold": 0.5 + "description": "Block I/O latency performance is deteriorating!", + "params": { + "look_back": 20, + "box_pts": 7, + "obs_size": 25, + "outlier_ratio_th": 0.3 } } ], diff --git a/config/module/sys_tcp_establish.json b/config/module/sys_tcp_establish.json index d695c2c..b6589ed 100644 --- a/config/module/sys_tcp_establish.json +++ b/config/module/sys_tcp_establish.json @@ -6,7 +6,10 @@ "entity_name": "tcp_link", "enable": true, "description": "RTT of syn packet(us): existing {:.0%} syn packets rtt are more than {:.0f} us", - "parameter": {} + "params": { + "look_back": 30, + "outlier_ratio_th": 0.3 + } } ], "Features": [ diff --git a/config/module/sys_tcp_transmission.json b/config/module/sys_tcp_transmission.json index 5347cc9..522056f 100644 --- a/config/module/sys_tcp_transmission.json +++ b/config/module/sys_tcp_transmission.json @@ -6,8 +6,37 @@ "entity_name": "tcp_link", "enable": true, "description": "Smoothed Round Trip Time(us)", - "parameter": { - "threshold": 0.5 + "params": { + "look_back": 10, + "box_pts": 3, + "obs_size": 25, + "outlier_ratio_th": 0.3 + } + }, + { + "metric": "gala_gopher_net_tcp_in_segs", + "kpi_type": "in_segs", + "entity_name": "tcp_link", + "enable": true, + "description": "Total number of segments received", + "params": { + "look_back": 10, + "box_pts": 3, + "obs_size": 25, + "outlier_ratio_th": 0.3 + } + }, + { + "metric": "gala_gopher_net_tcp_out_segs", + "kpi_type": "out_segs", + "entity_name": "tcp_link", + "enable": true, + "description": "Total number of segments sent", + "params": { + "look_back": 10, + "box_pts": 3, + "obs_size": 25, + "outlier_ratio_th": 0.3 } } ], -- 2.37.0.windows.1