From 8ec215e3c476901b356b7c0e6d271029248db779 Mon Sep 17 00:00:00 2001 From: lizhenxing11 Date: Tue, 22 Nov 2022 16:24:26 +0800 Subject: [PATCH] updates the model and improves cause inference update changelog update prep config update changelog --- add_metrics_anomaly_trends_indicator.patch | 284 +++++ gala-anteater.spec | 10 +- update_sys_io_latency_detector_model.patch | 1215 ++++++++++++++++++++ 3 files changed, 1507 insertions(+), 2 deletions(-) create mode 100644 add_metrics_anomaly_trends_indicator.patch create mode 100644 update_sys_io_latency_detector_model.patch diff --git a/add_metrics_anomaly_trends_indicator.patch b/add_metrics_anomaly_trends_indicator.patch new file mode 100644 index 0000000..882af81 --- /dev/null +++ b/add_metrics_anomaly_trends_indicator.patch @@ -0,0 +1,284 @@ +From ece4a0551bd81f64158ab465a865e31e97b63562 Mon Sep 17 00:00:00 2001 +From: lizhenxing11 +Date: Mon, 21 Nov 2022 14:54:20 +0800 +Subject: [PATCH 2/2] Add Metrics Anomaly Trends Indicator + +update config file +--- + anteater/core/feature.py | 8 +++++ + anteater/model/slope.py | 15 +++++++++ + anteater/module/app_sli_detector.py | 20 ++++++++--- + anteater/utils/data_load.py | 18 ++++++++-- + config/module/app_sli_rtt.json | 52 ++++++++++++++++++++--------- + 5 files changed, 90 insertions(+), 23 deletions(-) + +diff --git a/anteater/core/feature.py b/anteater/core/feature.py +index 306d835..6db764d 100644 +--- a/anteater/core/feature.py ++++ b/anteater/core/feature.py +@@ -12,6 +12,13 @@ + # ******************************************************************************/ + + from dataclasses import dataclass ++from enum import Enum ++ ++ ++class AnomalyTrend(Enum): ++ DEFAULT = 0 ++ RISE = 1 ++ FALL = 2 + + + @dataclass +@@ -19,3 +26,4 @@ class Feature: + metric: str + description: str + priority: int = 0 ++ atrend: AnomalyTrend = AnomalyTrend.DEFAULT +diff --git a/anteater/model/slope.py b/anteater/model/slope.py +index 422d6bc..08c4211 100644 +--- a/anteater/model/slope.py ++++ b/anteater/model/slope.py +@@ -29,3 +29,18 @@ def smooth_slope(time_series, windows_length): + val = conv_smooth(time_series.to_df(), box_pts=13) + val = slope(val, win_len=13) + return val[-windows_length:] ++ ++ ++def trend(y, win_len=None): ++ """Gets the trend for the y""" ++ if not win_len: ++ win_len = len(y) // 2 ++ ++ if np.mean(y[:win_len]) < np.mean(y[-win_len:]): ++ return 1 ++ ++ elif np.mean(y[:win_len]) > np.mean(y[-win_len:]): ++ return -1 ++ ++ else: ++ return 0 +diff --git a/anteater/module/app_sli_detector.py b/anteater/module/app_sli_detector.py +index b69f73c..b63f5e2 100644 +--- a/anteater/module/app_sli_detector.py ++++ b/anteater/module/app_sli_detector.py +@@ -20,7 +20,9 @@ import math + from typing import List + + from anteater.core.anomaly import Anomaly ++from anteater.core.feature import AnomalyTrend + from anteater.model.algorithms.spectral_residual import SpectralResidual ++from anteater.model.slope import trend + from anteater.model.smoother import conv_smooth + from anteater.model.three_sigma import three_sigma + from anteater.module.detector import Detector +@@ -134,10 +136,11 @@ class APPSliDetector(Detector): + + return anomalies + +- def detect_features(self, metrics, machine_id: str, top_n): ++ def detect_features(self, machine_id: str, top_n): ++ metric_atrend = {f.metric: f.atrend for f in self.features} + start, end = dt.last(minutes=6) + time_series_list = [] +- for metric in metrics: ++ for metric in metric_atrend.keys(): + time_series = self.data_loader.get_metric( + start, end, metric, label_name='machine_id', label_value=machine_id) + time_series_list.extend(time_series) +@@ -156,8 +159,16 @@ class APPSliDetector(Detector): + if all(x == values[0] for x in values): + continue + ++ if trend(time_series.values) < 0 and \ ++ metric_atrend[time_series.metric] == AnomalyTrend.RISE: ++ continue ++ ++ if trend(time_series.values) > 0 and \ ++ metric_atrend[time_series.metric] == AnomalyTrend.FALL: ++ continue ++ + scores = sr_model.compute_score(values) +- score = max(scores[-13:]) ++ score = max(scores[-25:]) + + if math.isnan(score) or math.isinf(score): + continue +@@ -170,9 +181,8 @@ class APPSliDetector(Detector): + + def report(self, anomaly: Anomaly, machine_id: str): + """Reports a single anomaly at each time""" +- feature_metrics = [f.metric for f in self.features] + description = {f.metric: f.description for f in self.features} +- cause_metrics = self.detect_features(feature_metrics, machine_id, top_n=60) ++ cause_metrics = self.detect_features(machine_id, top_n=60) + cause_metrics = [ + {'metric': cause[0].metric, + 'label': cause[0].labels, +diff --git a/anteater/utils/data_load.py b/anteater/utils/data_load.py +index 108d5ed..f8ce277 100644 +--- a/anteater/utils/data_load.py ++++ b/anteater/utils/data_load.py +@@ -17,7 +17,7 @@ from os import path, sep + from json import JSONDecodeError + from typing import List, Tuple + +-from anteater.core.feature import Feature ++from anteater.core.feature import AnomalyTrend, Feature + from anteater.core.kpi import KPI + from anteater.utils.log import logger + +@@ -76,7 +76,21 @@ def load_kpi_feature(file_name) -> Tuple[List[KPI], List[Feature]]: + raise e + + kpis = [KPI(**param) for param in params.get('KPI')] +- features = [Feature(**param) for param in params.get('Features')] ++ ++ features = [] ++ for param in params.get('Features'): ++ parsed_param = {} ++ for key, value in param.items(): ++ if key == 'atrend': ++ if value.lower() == 'rise': ++ value = AnomalyTrend.RISE ++ elif value.lower() == 'fall': ++ value = AnomalyTrend.FALL ++ else: ++ value = AnomalyTrend.DEFAULT ++ parsed_param[key] = value ++ ++ features.append(Feature(**parsed_param)) + + if duplicated_metric([kpi.metric for kpi in kpis]) or \ + duplicated_metric([f.metric for f in features]): +diff --git a/config/module/app_sli_rtt.json b/config/module/app_sli_rtt.json +index 0744416..b7f78b7 100644 +--- a/config/module/app_sli_rtt.json ++++ b/config/module/app_sli_rtt.json +@@ -34,19 +34,23 @@ + }, + { + "metric": "gala_gopher_block_latency_req_jitter", +- "description": "block层request时延抖动异常" ++ "description": "block层request时延抖动异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_block_latency_req_last", +- "description": "block层request时延最近值异常" ++ "description": "block层request时延最近值异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_block_latency_req_max", +- "description": "block层request时延最大值异常" ++ "description": "block层request时延最大值异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_block_latency_req_sum", +- "description": "block层request时延总计值异常" ++ "description": "block层request时延总计值异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_cpu_iowait_total_second", +@@ -54,11 +58,13 @@ + }, + { + "metric": "gala_gopher_cpu_user_total_second", +- "description": "用户态cpu占用时间(不包括nice)异常" ++ "description": "用户态cpu占用时间(不包括nice)异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_cpu_total_used_per", +- "description": "CPU总利用率异常" ++ "description": "CPU总利用率异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_cpu_backlog_drops", +@@ -86,7 +92,8 @@ + }, + { + "metric": "gala_gopher_disk_r_await", +- "description": "读响应时间异常" ++ "description": "读响应时间异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_disk_rareq", +@@ -94,19 +101,23 @@ + }, + { + "metric": "gala_gopher_disk_rspeed", +- "description": "读速率(IOPS)异常" ++ "description": "读速率(IOPS)异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_disk_rspeed_kB", +- "description": "读吞吐量异常" ++ "description": "读吞吐量异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_disk_util", +- "description": "磁盘使用率异常" ++ "description": "磁盘使用率异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_disk_w_await", +- "description": "写响应时间异常" ++ "description": "写响应时间异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_disk_wareq", +@@ -114,19 +125,23 @@ + }, + { + "metric": "gala_gopher_disk_wspeed", +- "description": "写速率(IOPS)异常" ++ "description": "写速率(IOPS)异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_disk_wspeed_kB", +- "description": "写吞吐量异常" ++ "description": "写吞吐量异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_proc_read_bytes", +- "description": "进程实际从磁盘读取的字节数异常" ++ "description": "进程实际从磁盘读取的字节数异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_proc_write_bytes", +- "description": "进程实际从磁盘写入的字节数异常" ++ "description": "进程实际从磁盘写入的字节数异常", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_net_tcp_retrans_segs", +@@ -134,7 +149,12 @@ + }, + { + "metric": "gala_gopher_tcp_link_lost_out", +- "description": "TPC丢包数异常" ++ "description": "TCP丢包数异常" ++ }, ++ { ++ "metric": "gala_gopher_tcp_link_srtt", ++ "description": "TCP超时", ++ "atrend": "rise" + }, + { + "metric": "gala_gopher_tcp_link_notack_bytes", +-- +2.37.0.windows.1 + diff --git a/gala-anteater.spec b/gala-anteater.spec index 4b2b123..64f4e48 100644 --- a/gala-anteater.spec +++ b/gala-anteater.spec @@ -2,7 +2,7 @@ Name: gala-anteater Version: 1.0.0 -Release: 1 +Release: 2 Summary: A time-series anomaly detection platform for operating system. License: MulanPSL2 URL: https://gitee.com/openeuler/gala-anteater @@ -11,6 +11,9 @@ BuildRoot: %{_builddir}/%{name}-%{version} BuildRequires: procps-ng python3-setuptools Requires: python3-gala-anteater = %{version}-%{release} +patch0: update_sys_io_latency_detector_model.patch +patch1: add_metrics_anomaly_trends_indicator.patch + %description Abnormal detection module for A-Ops project @@ -23,7 +26,7 @@ Requires: python3-pandas python3-requests python3-scikit-learn python3-py Python3 package of gala-anteater %prep -%setup -q +%autosetup -n %{name}-%{version} -p1 %build %py3_build @@ -56,5 +59,8 @@ Python3 package of gala-anteater %changelog +* Tue Nov 22 2022 Li Zhenxing - 1.0.0-2 +- Updates anomaly detection model and imporves cause inference result + * Sat Nov 12 2022 Zhen Chen - 1.0.0-1 - Package init diff --git a/update_sys_io_latency_detector_model.patch b/update_sys_io_latency_detector_model.patch new file mode 100644 index 0000000..09dd280 --- /dev/null +++ b/update_sys_io_latency_detector_model.patch @@ -0,0 +1,1215 @@ +From 0b2243b61fe5083784e634db0d97c2888330eb0a Mon Sep 17 00:00:00 2001 +From: lizhenxing11 +Date: Mon, 14 Nov 2022 19:43:17 +0800 +Subject: [PATCH 1/2] Update sys io latency detector model + +Add tcp detector and update model + +fix code check issue +--- + anteater/core/anomaly.py | 8 ++ + anteater/core/kpi.py | 2 +- + anteater/model/three_sigma.py | 37 ++++++ + anteater/module/app_sli_detector.py | 93 ++++++++------- + anteater/module/detector.py | 54 +++++++-- + anteater/module/proc_io_latency_detector.py | 49 +++++--- + anteater/module/sys_io_latency_detector.py | 109 +++++++++--------- + anteater/module/sys_tcp_establish_detector.py | 108 +++++++++-------- + .../module/sys_tcp_transmission_detector.py | 76 ++++++------ + anteater/provider/base.py | 14 ++- + anteater/source/anomaly_report.py | 1 + + anteater/template/template.py | 2 +- + anteater/utils/common.py | 12 ++ + config/gala-anteater.yaml | 6 +- + config/module/app_sli_rtt.json | 15 ++- + config/module/proc_io_latency.json | 14 ++- + config/module/sys_io_latency.json | 9 +- + config/module/sys_tcp_establish.json | 5 +- + config/module/sys_tcp_transmission.json | 33 +++++- + 19 files changed, 416 insertions(+), 231 deletions(-) + create mode 100644 anteater/model/three_sigma.py + +diff --git a/anteater/core/anomaly.py b/anteater/core/anomaly.py +index cce7767..b95eeab 100644 +--- a/anteater/core/anomaly.py ++++ b/anteater/core/anomaly.py +@@ -13,6 +13,8 @@ + + from dataclasses import dataclass + ++from anteater.utils.time_series import TimeSeries ++ + + @dataclass + class Anomaly: +@@ -21,3 +23,9 @@ class Anomaly: + score: float = None + entity_name: str = None + description: str = None ++ ++ ++@dataclass ++class CauseMetric: ++ ts: TimeSeries ++ score: float +diff --git a/anteater/core/kpi.py b/anteater/core/kpi.py +index db4c046..620ffdf 100644 +--- a/anteater/core/kpi.py ++++ b/anteater/core/kpi.py +@@ -21,5 +21,5 @@ class KPI: + entity_name: str = None + enable: bool = False + description: str = "" +- parameter: dict = field(default=dict) ++ params: dict = field(default=dict) + +diff --git a/anteater/model/three_sigma.py b/anteater/model/three_sigma.py +new file mode 100644 +index 0000000..08d05ba +--- /dev/null ++++ b/anteater/model/three_sigma.py +@@ -0,0 +1,37 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (c) 2022 Huawei Technologies Co., Ltd. ++# gala-anteater is licensed under Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, ++# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, ++# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++ ++ ++import numpy as np ++ ++ ++def three_sigma(values, obs_size, method="abs"): ++ """The '3-sigma rule' outlier detect function""" ++ if obs_size <= 0: ++ raise ValueError("The obs_size should great than zero!") ++ if len(values) <= obs_size: ++ raise ValueError("The obs_size should be great than values' length") ++ train_val = values[:-obs_size] ++ obs_val = values[-obs_size:] ++ mean = np.mean(train_val) ++ std = np.std(train_val) ++ if method == "abs": ++ outlier = [val for val in obs_val if abs(val - mean) > 3 * std] ++ elif method == 'min': ++ outlier = [val for val in obs_val if val < mean - 3 * std] ++ elif method == 'max': ++ outlier = [val for val in obs_val if val > mean + 3 * std] ++ else: ++ raise ValueError(f'Unknown method {method}') ++ ++ return outlier, mean, std +diff --git a/anteater/module/app_sli_detector.py b/anteater/module/app_sli_detector.py +index e38d53e..b69f73c 100644 +--- a/anteater/module/app_sli_detector.py ++++ b/anteater/module/app_sli_detector.py +@@ -19,17 +19,15 @@ Description: The anomaly detector implementation on APP Sli + import math + from typing import List + +-import numpy as np +- + from anteater.core.anomaly import Anomaly + from anteater.model.algorithms.spectral_residual import SpectralResidual +-from anteater.model.slope import smooth_slope + from anteater.model.smoother import conv_smooth ++from anteater.model.three_sigma import three_sigma + from anteater.module.detector import Detector + from anteater.source.anomaly_report import AnomalyReport + from anteater.source.metric_loader import MetricLoader + from anteater.template.app_anomaly_template import AppAnomalyTemplate +-from anteater.utils.data_load import load_kpi_feature ++from anteater.utils.common import divide + from anteater.utils.datetime import DateTimeManager as dt + from anteater.utils.log import logger + +@@ -40,48 +38,52 @@ class APPSliDetector(Detector): + """ + + def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): +- super().__init__(data_loader, anomaly_report) +- self.kpis, self.features = load_kpi_feature('app_sli_rtt.json') ++ file_name = 'app_sli_rtt.json' ++ super().__init__(data_loader, anomaly_report, file_name) + + def execute_detect(self, machine_id: str): + for kpi in self.kpis: +- parameter = kpi.parameter + if kpi.kpi_type == 'rtt': +- anomalies = self.detect_rtt(kpi, machine_id, parameter) ++ anomalies = self.detect_rtt(kpi, machine_id) + else: +- anomalies = self.detect_tps(kpi, machine_id, parameter) ++ anomalies = self.detect_tps(kpi, machine_id) + + for anomaly in anomalies: +- self.report(anomaly, kpi.entity_name, machine_id) ++ self.report(anomaly, machine_id) + +- def detect_rtt(self, kpi, machine_id: str, parameter: dict) -> List[Anomaly]: ++ def detect_rtt(self, kpi, machine_id: str) -> List[Anomaly]: + """Detects rtt by rule-based model""" +- start, end = dt.last(minutes=10) +- time_series_list = self.data_loader.get_metric( ++ look_back = kpi.params.get('look_back', None) ++ box_pts = kpi.params.get('box_pts', None) ++ obs_size = kpi.params.get('obs_size', None) ++ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) ++ ++ start, end = dt.last(minutes=look_back) ++ ts_list = self.data_loader.get_metric( + start, end, kpi.metric, label_name='machine_id', label_value=machine_id) + +- if not time_series_list: ++ if not ts_list: + logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') + return [] + + point_count = self.data_loader.expected_point_length(start, end) + anomalies = [] +- threshold = parameter['threshold'] +- min_nsec = parameter['min_nsec'] +- for time_series in time_series_list: +- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: ++ for _ts in ts_list: ++ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: + continue + +- score = max(smooth_slope(time_series, windows_length=13)) +- if math.isnan(score) or math.isinf(score): +- continue ++ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) ++ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') ++ ratio = divide(len(outlier), obs_size) + +- avg_nsec = np.mean(time_series.values[-13:]) +- if score >= threshold and avg_nsec >= min_nsec: ++ if outlier and ratio >= outlier_ratio_th: ++ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' ++ f'Mean: {mean}, Std: {std}') + anomalies.append( +- Anomaly(metric=time_series.metric, +- labels=time_series.labels, +- score=score, ++ Anomaly(metric=_ts.metric, ++ labels=_ts.labels, ++ score=ratio, ++ entity_name=kpi.entity_name, + description=kpi.description)) + + anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) +@@ -91,9 +93,14 @@ class APPSliDetector(Detector): + + return anomalies + +- def detect_tps(self, kpi, machine_id: str, parameter: dict) -> List[Anomaly]: ++ def detect_tps(self, kpi, machine_id: str) -> List[Anomaly]: + """Detects tps by rule based model""" +- start, end = dt.last(minutes=10) ++ look_back = kpi.params.get('look_back', None) ++ box_pts = kpi.params.get('box_pts', None) ++ obs_size = kpi.params.get('obs_size', None) ++ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) ++ ++ start, end = dt.last(minutes=look_back) + time_series_list = self.data_loader.get_metric( + start, end, kpi.metric, label_name='machine_id', label_value=machine_id) + +@@ -103,21 +110,21 @@ class APPSliDetector(Detector): + + point_count = self.data_loader.expected_point_length(start, end) + anomalies = [] +- threshold = parameter['threshold'] +- for time_series in time_series_list: +- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: ++ for _ts in time_series_list: ++ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: + continue +- pre_values = time_series.values[:-25] +- cur_values = time_series.values[-25:] +- mean = np.mean(pre_values) +- std = np.std(pre_values) +- outlier = [val for val in cur_values if val < mean - 3 * std] ++ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) ++ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') ++ ratio = divide(len(outlier), obs_size) + +- if outlier and len(outlier) >= len(cur_values) * 0.3: ++ if outlier and ratio >= outlier_ratio_th: ++ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' ++ f'Mean: {mean}, Std: {std}') + anomalies.append( +- Anomaly(metric=time_series.metric, +- labels=time_series.labels, +- score=1, ++ Anomaly(metric=_ts.metric, ++ labels=_ts.labels, ++ score=ratio, ++ entity_name=kpi.entity_name, + description=kpi.description)) + + anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) +@@ -161,7 +168,8 @@ class APPSliDetector(Detector): + + return result[0: top_n] + +- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): ++ def report(self, anomaly: Anomaly, machine_id: str): ++ """Reports a single anomaly at each time""" + feature_metrics = [f.metric for f in self.features] + description = {f.metric: f.description for f in self.features} + cause_metrics = self.detect_features(feature_metrics, machine_id, top_n=60) +@@ -172,6 +180,5 @@ class APPSliDetector(Detector): + 'description': description.get(cause[0].metric, '')} + for cause in cause_metrics] + timestamp = dt.utc_now() +- template = AppAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) +- template.labels = anomaly.labels ++ template = AppAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) + self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) +diff --git a/anteater/module/detector.py b/anteater/module/detector.py +index 51dabbd..bfe516e 100644 +--- a/anteater/module/detector.py ++++ b/anteater/module/detector.py +@@ -11,37 +11,69 @@ + # See the Mulan PSL v2 for more details. + # ******************************************************************************/ + +-from abc import abstractmethod ++from abc import abstractmethod, ABC + + from anteater.core.anomaly import Anomaly + from anteater.source.anomaly_report import AnomalyReport + from anteater.source.metric_loader import MetricLoader ++from anteater.utils.common import same_intersection_key_value ++from anteater.utils.data_load import load_kpi_feature + from anteater.utils.datetime import DateTimeManager as dt + from anteater.utils.log import logger + from anteater.utils.timer import timer + + +-class Detector: +- """The base detector class""" +- def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): ++class Detector(ABC): ++ """The anomaly detector base class""" ++ def __init__( ++ self, ++ data_loader: MetricLoader, ++ anomaly_report: AnomalyReport, ++ file_name: str): ++ """The detector base class initializer""" + self.data_loader = data_loader + self.anomaly_report = anomaly_report ++ self.kpis, self.features = load_kpi_feature(file_name) ++ ++ @staticmethod ++ def filter_ts(ts_list, filters): ++ result = [] ++ for _ts in ts_list: ++ if same_intersection_key_value(_ts.labels, filters): ++ result.append(_ts) ++ ++ return result + + @abstractmethod + def execute_detect(self, machine_id): ++ """Executes anomaly detection on specified machine id""" ++ pass ++ ++ @abstractmethod ++ def report(self, anomaly: Anomaly, machine_id: str): ++ """Reports a single anomaly at each time""" + pass + + @timer + def detect(self): ++ """The main function of detector""" ++ if not self.kpis: ++ logger.debug(f"Null kpis in detector: {self.__class__.__name__}!") ++ return ++ + logger.info(f"Run detector: {self.__class__.__name__}!") +- start, end = dt.last(minutes=1) +- metrics_kpi = [k.metric for k in self.kpis] +- metrics_feat = [f.metric for f in self.features] +- metrics = metrics_kpi + metrics_feat +- machine_ids = self.data_loader.get_unique_machines(start, end, metrics) ++ self.pre_process() ++ machine_ids = self.get_unique_machine_id() + for _id in machine_ids: + self.execute_detect(_id) + +- @abstractmethod +- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): ++ def pre_process(self): ++ """Executes pre-process for generating necessary parameters""" + pass ++ ++ def get_unique_machine_id(self): ++ """Gets unique machine ids during past minutes""" ++ start, end = dt.last(minutes=1) ++ metrics = [_kpi.metric for _kpi in self.kpis] ++ machine_ids = self.data_loader.get_unique_machines(start, end, metrics) ++ return machine_ids +diff --git a/anteater/module/proc_io_latency_detector.py b/anteater/module/proc_io_latency_detector.py +index ee1d7c6..3ea2c51 100644 +--- a/anteater/module/proc_io_latency_detector.py ++++ b/anteater/module/proc_io_latency_detector.py +@@ -16,11 +16,13 @@ import math + from anteater.core.anomaly import Anomaly + from anteater.model.algorithms.spectral_residual import SpectralResidual + from anteater.model.slope import smooth_slope ++from anteater.model.smoother import conv_smooth ++from anteater.model.three_sigma import three_sigma + from anteater.module.detector import Detector + from anteater.source.anomaly_report import AnomalyReport + from anteater.source.metric_loader import MetricLoader + from anteater.template.sys_anomaly_template import SysAnomalyTemplate +-from anteater.utils.data_load import load_kpi_feature ++from anteater.utils.common import divide + from anteater.utils.datetime import DateTimeManager as dt + from anteater.utils.log import logger + +@@ -31,40 +33,50 @@ class ProcIOLatencyDetector(Detector): + """ + + def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): +- super().__init__(data_loader, anomaly_report) +- self.kpis, self.features = load_kpi_feature('proc_io_latency.json') ++ file_name = 'proc_io_latency.json' ++ super().__init__(data_loader, anomaly_report, file_name) + + def execute_detect(self, machine_id): + for kpi in self.kpis: +- parameter = kpi.parameter +- start, end = dt.last(minutes=10) +- time_series_list = self.data_loader.get_metric( ++ look_back = kpi.params.get('look_back', None) ++ box_pts = kpi.params.get('box_pts', None) ++ obs_size = kpi.params.get('obs_size', None) ++ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) ++ ++ start, end = dt.last(minutes=look_back) ++ ts_list = self.data_loader.get_metric( + start, end, kpi.metric, label_name='machine_id', label_value=machine_id) + +- if not time_series_list: ++ if not ts_list: + logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') + return + + point_count = self.data_loader.expected_point_length(start, end) + anomalies = [] +- threshold = parameter['threshold'] +- for time_series in time_series_list: +- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: ++ for _ts in ts_list: ++ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: + continue + +- if sum(time_series.values) == 0: ++ if sum(_ts.values) == 0: + continue + +- score = max(smooth_slope(time_series, windows_length=13)) ++ score = max(smooth_slope(_ts, windows_length=13)) + + if math.isnan(score) or math.isinf(score): + continue + +- if score > threshold: ++ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) ++ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') ++ ratio = divide(len(outlier), obs_size) ++ ++ if outlier and ratio >= outlier_ratio_th: ++ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' ++ f'Mean: {mean}, Std: {std}') + anomalies.append( +- Anomaly(metric=time_series.metric, +- labels=time_series.labels, ++ Anomaly(metric=_ts.metric, ++ labels=_ts.labels, + score=score, ++ entity_name=kpi.entity_name, + description=kpi.description)) + + anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) +@@ -72,7 +84,7 @@ class ProcIOLatencyDetector(Detector): + if anomalies: + logger.info('Sys io latency anomalies was detected.') + for anomaly in anomalies: +- self.report(anomaly, kpi.entity_name, machine_id) ++ self.report(anomaly, machine_id) + + def detect_features(self, machine_id: str, top_n=3): + priorities = {f.metric: f.priority for f in self.features} +@@ -110,7 +122,7 @@ class ProcIOLatencyDetector(Detector): + + return result + +- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): ++ def report(self, anomaly: Anomaly, machine_id: str): + description = {f.metric: f.description for f in self.features} + cause_metrics = self.detect_features(machine_id, top_n=3) + cause_metrics = [ +@@ -123,6 +135,5 @@ class ProcIOLatencyDetector(Detector): + cause[0].labels.get('comm', ''))} + for cause in cause_metrics] + timestamp = dt.utc_now() +- template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) +- template.labels = anomaly.labels ++ template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) + self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) +diff --git a/anteater/module/sys_io_latency_detector.py b/anteater/module/sys_io_latency_detector.py +index f459a57..c29ce72 100644 +--- a/anteater/module/sys_io_latency_detector.py ++++ b/anteater/module/sys_io_latency_detector.py +@@ -15,12 +15,13 @@ import math + + from anteater.core.anomaly import Anomaly + from anteater.model.algorithms.spectral_residual import SpectralResidual +-from anteater.model.slope import smooth_slope ++from anteater.model.smoother import conv_smooth ++from anteater.model.three_sigma import three_sigma + from anteater.module.detector import Detector + from anteater.source.anomaly_report import AnomalyReport + from anteater.source.metric_loader import MetricLoader + from anteater.template.sys_anomaly_template import SysAnomalyTemplate +-from anteater.utils.data_load import load_kpi_feature ++from anteater.utils.common import divide, same_intersection_key_value + from anteater.utils.datetime import DateTimeManager as dt + from anteater.utils.log import logger + +@@ -31,100 +32,104 @@ class SysIOLatencyDetector(Detector): + """ + + def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): +- super().__init__(data_loader, anomaly_report) +- self.kpis, self.features = load_kpi_feature('sys_io_latency.json') ++ """The system i/o latency detector initializer""" ++ file_name = 'sys_io_latency.json' ++ super().__init__(data_loader, anomaly_report, file_name) + + def execute_detect(self, machine_id: str): ++ """Executes the detector based on machine id""" + kpi = self.kpis[0] +- parameter = kpi.parameter ++ look_back = kpi.params.get('look_back', None) ++ box_pts = kpi.params.get('box_pts', None) ++ obs_size = kpi.params.get('obs_size', None) ++ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) + +- start, end = dt.last(minutes=10) +- time_series_list = self.data_loader.get_metric( +- start, end, kpi.metric, label_name='machine_id', label_value=machine_id) ++ start, end = dt.last(minutes=look_back) ++ ts_list = self.data_loader.\ ++ get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) + +- if not time_series_list: ++ if not ts_list: + logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') + return + + point_count = self.data_loader.expected_point_length(start, end) + anomalies = [] +- threshold = parameter['threshold'] +- for time_series in time_series_list: +- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: ++ for _ts in ts_list: ++ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: + continue + +- if sum(time_series.values) == 0: ++ if sum(_ts.values) == 0: + continue + +- score = max(smooth_slope(time_series, windows_length=13)) ++ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) ++ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='max') ++ ratio = divide(len(outlier), obs_size) + +- if math.isnan(score) or math.isinf(score): +- continue +- +- if score > threshold: ++ if outlier and ratio >= outlier_ratio_th: ++ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' ++ f'Mean: {mean}, Std: {std}') + anomalies.append( +- Anomaly(metric=time_series.metric, +- labels=time_series.labels, +- score=score, ++ Anomaly(metric=_ts.metric, ++ labels=_ts.labels, ++ score=ratio, ++ entity_name=kpi.entity_name, + description=kpi.description)) + +- anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) +- + if anomalies: + logger.info('Sys io latency anomalies was detected.') ++ anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) + for anomaly in anomalies: +- self.report(anomaly, kpi.entity_name, machine_id) ++ self.report(anomaly, machine_id) + +- def detect_features(self, machine_id: str, top_n: int): ++ def find_cause_metrics(self, machine_id: str, filters: dict, top_n: int): ++ """Detects the abnormal features and reports the caused metrics""" + priorities = {f.metric: f.priority for f in self.features} + start, end = dt.last(minutes=6) +- time_series_list = [] ++ ts_list = [] + for metric in priorities.keys(): +- time_series = self.data_loader.get_metric( ++ _ts_list = self.data_loader.get_metric( + start, end, metric, label_name='machine_id', label_value=machine_id) +- time_series_list.extend(time_series) ++ for _ts in _ts_list: ++ if same_intersection_key_value(_ts.labels, filters): ++ ts_list.append(_ts) + + point_count = self.data_loader.expected_point_length(start, end) +- sr_model = SpectralResidual(12, 24, 50) +- ++ model = SpectralResidual(12, 24, 50) + result = [] +- for time_series in time_series_list: +- if len(time_series.values) < point_count * 0.9 or \ +- len(time_series.values) > point_count * 1.5: ++ for _ts in ts_list: ++ if len(_ts.values) < point_count * 0.9 or \ ++ len(_ts.values) > point_count * 1.5: + continue + +- values = time_series.values +- +- if all(x == values[0] for x in values): ++ if all(x == _ts.values[0] for x in _ts.values): + continue + +- scores = sr_model.compute_score(values) +- score = max(scores[-13:]) +- ++ score = max(model.compute_score(_ts.values)[-13:]) + if math.isnan(score) or math.isinf(score): + continue + +- result.append((time_series, score)) ++ result.append((_ts, score)) + + result = sorted(result, key=lambda x: x[1], reverse=True)[0: top_n] + result = sorted(result, key=lambda x: priorities[x[0].metric]) + + return result + +- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): +- feature_metrics = [f.metric for f in self.features] ++ def report(self, anomaly: Anomaly, machine_id: str): ++ """Reports the anomaly with it's caused metrics""" + description = {f.metric: f.description for f in self.features} +- cause_metrics = self.detect_features(machine_id, top_n=3) ++ cause_metrics = self.find_cause_metrics(machine_id, anomaly.labels, top_n=3) + cause_metrics = [ +- {'metric': cause[0].metric, +- 'label': cause[0].labels, +- 'score': cause[1], +- 'description': description.get(cause[0].metric, '').format( +- cause[0].labels.get('disk_name', ''), +- cause[0].labels.get('tgid', ''), +- cause[0].labels.get('comm', ''))} ++ { ++ 'metric': cause[0].metric, ++ 'label': cause[0].labels, ++ 'score': cause[1], ++ 'description': description.get(cause[0].metric, '').format( ++ cause[0].labels.get('disk_name', ''), ++ cause[0].labels.get('tgid', ''), ++ cause[0].labels.get('comm', '')) ++ } + for cause in cause_metrics] + timestamp = dt.utc_now() +- template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) +- template.labels = anomaly.labels ++ template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) + self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) +diff --git a/anteater/module/sys_tcp_establish_detector.py b/anteater/module/sys_tcp_establish_detector.py +index 4b49b25..3ca61c5 100644 +--- a/anteater/module/sys_tcp_establish_detector.py ++++ b/anteater/module/sys_tcp_establish_detector.py +@@ -11,15 +11,17 @@ + # See the Mulan PSL v2 for more details. + # ******************************************************************************/ + ++from functools import reduce ++from typing import List ++ + import numpy as np + +-from anteater.core.anomaly import Anomaly ++from anteater.core.anomaly import Anomaly, CauseMetric + from anteater.module.detector import Detector + from anteater.source.anomaly_report import AnomalyReport + from anteater.source.metric_loader import MetricLoader + from anteater.template.sys_anomaly_template import SysAnomalyTemplate +-from anteater.utils.common import divide +-from anteater.utils.data_load import load_kpi_feature ++from anteater.utils.common import divide, same_intersection_key_value + from anteater.utils.datetime import DateTimeManager as dt + from anteater.utils.log import logger + +@@ -36,69 +38,85 @@ class SysTcpEstablishDetector(Detector): + """ + + def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): +- super().__init__(data_loader, anomaly_report) +- self.kpis, self.features = load_kpi_feature('sys_tcp_establish.json') ++ file_name = 'sys_tcp_establish.json' ++ super().__init__(data_loader, anomaly_report, file_name) + +- def execute_detect(self, machine_id: str): ++ self.mean = None ++ self.std = None ++ ++ def pre_process(self): ++ """Calculates ts values mean and std""" + kpi = self.kpis[0] +- start_30_minutes, _ = dt.last(minutes=30) +- start_3_minutes, end = dt.last(minutes=3) ++ look_back = kpi.params.get('look_back', None) + +- pre_ts = self.data_loader.get_metric( +- start_30_minutes, start_3_minutes, kpi.metric, label_name='machine_id', label_value=machine_id) +- pre_establish_time = [t.values[0] for t in pre_ts if t.values] ++ start, _ = dt.last(minutes=look_back) ++ mid, _ = dt.last(minutes=3) + +- ts = self.data_loader.get_metric( +- start_3_minutes, end, kpi.metric, label_name='machine_id', label_value=machine_id) +- establish_time = [t.values[0] for t in ts if t.values] ++ ts_list = self.data_loader.get_metric(start, mid, kpi.metric) ++ establish_time = reduce(lambda x, y: x + y, [list(set(_ts.values)) for _ts in ts_list]) + +- mean = np.mean(pre_establish_time) +- std = np.std(pre_establish_time) ++ self.mean = np.mean(establish_time) ++ self.std = np.std(establish_time) + +- outlier = [val for val in establish_time if abs(val - mean) > 3 * std] ++ def execute_detect(self, machine_id: str): ++ """Executes the detector based on machine id""" ++ kpi = self.kpis[0] ++ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) + +- if outlier and len(outlier) > len(ts) * 0.3: ++ start, end = dt.last(minutes=3) ++ ts_list = self.data_loader. \ ++ get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) ++ establish_time = reduce(lambda x, y: x + y, [list(set(_ts.values)) for _ts in ts_list]) ++ ++ outlier = [val for val in establish_time if abs(val - self.mean) > 3 * self.std] ++ ratio = divide(len(outlier), len(establish_time)) ++ if outlier and ratio > outlier_ratio_th: ++ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' ++ f'Mean: {self.mean}, Std: {self.std}') + logger.info('Sys tcp establish anomalies was detected.') +- if establish_time: +- percentile = divide(len(outlier), len(establish_time)) +- else: +- percentile = 0 + anomaly = Anomaly( + metric=kpi.metric, + labels={}, +- description=kpi.description.format(percentile, min(outlier))) +- self.report(anomaly, kpi.entity_name, machine_id) ++ entity_name=kpi.entity_name, ++ description=kpi.description.format(ratio, min(outlier))) ++ self.report(anomaly, machine_id) + +- def detect_features(self, machine_id: str): ++ def find_cause_metrics(self, machine_id: str, filters: dict) -> List[CauseMetric]: ++ """Detects the abnormal features and reports the caused metrics""" ++ priorities = {f.metric: f.priority for f in self.features} + start, end = dt.last(minutes=3) +- time_series_list = [] +- metrics = [f.metric for f in self.features] +- for metric in metrics: +- time_series = self.data_loader.get_metric( +- start, end, metric, label_name='machine_id', label_value=machine_id) +- time_series_list.extend(time_series) ++ ts_list = [] ++ for metric in priorities.keys(): ++ _ts_list = self.data_loader.\ ++ get_metric(start, end, metric, label_name='machine_id', label_value=machine_id) ++ filtered_ts_list = self.filter_ts(_ts_list, filters) ++ ts_list.extend(filtered_ts_list) + + result = [] +- for ts in time_series_list: +- if ts.values and max(ts.values) > 0: +- result.append((ts, max(ts.values))) ++ for _ts in ts_list: ++ if _ts.values and max(_ts.values) > 0: ++ cause_metric = CauseMetric(ts=_ts, score=max(_ts.values)) ++ result.append(cause_metric) + +- result = sorted(result, key=lambda x: x[1], reverse=True) ++ result = sorted(result, key=lambda x: x.score, reverse=True) + + return result + +- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): ++ def report(self, anomaly: Anomaly, machine_id: str): ++ """Reports a single anomaly at each time""" + description = {f.metric: f.description for f in self.features} +- cause_metrics = self.detect_features(machine_id) ++ cause_metrics = self.find_cause_metrics(machine_id, anomaly.labels) + cause_metrics = [ +- {'metric': cause[0].metric, +- 'label': cause[0].labels, +- 'score': cause[1], +- 'description': description.get(cause[0].metric, '').format( +- cause[0].labels.get('ppid', ''), +- cause[0].labels.get('s_port', ''))} ++ { ++ 'metric': cause.ts.metric, ++ 'label': cause.ts.labels, ++ 'score': cause.score, ++ 'description': description.get(cause.ts.metric, '').format( ++ cause.ts.labels.get('ppid', ''), ++ cause.ts.labels.get('s_port', '')) ++ } + for cause in cause_metrics] ++ + timestamp = dt.utc_now() +- template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) +- template.labels = anomaly.labels ++ template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) + self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) +diff --git a/anteater/module/sys_tcp_transmission_detector.py b/anteater/module/sys_tcp_transmission_detector.py +index 9af4760..ad383b4 100644 +--- a/anteater/module/sys_tcp_transmission_detector.py ++++ b/anteater/module/sys_tcp_transmission_detector.py +@@ -16,11 +16,13 @@ import math + from anteater.core.anomaly import Anomaly + from anteater.model.algorithms.spectral_residual import SpectralResidual + from anteater.model.slope import smooth_slope ++from anteater.model.smoother import conv_smooth ++from anteater.model.three_sigma import three_sigma + from anteater.module.detector import Detector + from anteater.source.anomaly_report import AnomalyReport + from anteater.source.metric_loader import MetricLoader + from anteater.template.sys_anomaly_template import SysAnomalyTemplate +-from anteater.utils.data_load import load_kpi_feature ++from anteater.utils.common import divide + from anteater.utils.datetime import DateTimeManager as dt + from anteater.utils.log import logger + +@@ -31,40 +33,45 @@ class SysTcpTransmissionDetector(Detector): + """ + + def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): +- super().__init__(data_loader, anomaly_report) +- self.kpis, self.features = load_kpi_feature('sys_tcp_transmission.json') ++ file_name = 'sys_tcp_transmission.json' ++ super().__init__(data_loader, anomaly_report, file_name) + + def execute_detect(self, machine_id: str): + for kpi in self.kpis: +- parameter = kpi.parameter +- start, end = dt.last(minutes=10) +- time_series_list = self.data_loader.get_metric( +- start, end, kpi.metric, label_name='machine_id', label_value=machine_id) ++ look_back = kpi.params.get('look_back', None) ++ box_pts = kpi.params.get('box_pts', None) ++ obs_size = kpi.params.get('obs_size', None) ++ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) + +- if not time_series_list: ++ start, end = dt.last(minutes=look_back) ++ ts_list = self.data_loader.\ ++ get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) ++ ++ if not ts_list: + logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') + return + + point_count = self.data_loader.expected_point_length(start, end) + anomalies = [] +- threshold = parameter['threshold'] +- for time_series in time_series_list: +- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: ++ for _ts in ts_list: ++ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: + continue + +- if sum(time_series.values) == 0: ++ if sum(_ts.values) == 0: + continue + +- score = max(smooth_slope(time_series, windows_length=13)) +- +- if math.isnan(score) or math.isinf(score): +- continue ++ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) ++ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') ++ ratio = divide(len(outlier), obs_size) + +- if score > threshold: ++ if outlier and ratio >= outlier_ratio_th: ++ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' ++ f'Mean: {mean}, Std: {std}') + anomalies.append( +- Anomaly(metric=time_series.metric, +- labels=time_series.labels, +- score=score, ++ Anomaly(metric=_ts.metric, ++ labels=_ts.labels, ++ score=ratio, ++ entity_name=kpi.entity_name, + description=kpi.description)) + + anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) +@@ -72,45 +79,45 @@ class SysTcpTransmissionDetector(Detector): + if anomalies: + logger.info('Sys io latency anomalies was detected.') + for anomaly in anomalies: +- self.report(anomaly, kpi.entity_name, machine_id) ++ self.report(anomaly, machine_id) + + def detect_features(self, machine_id: str, top_n=3): + priorities = {f.metric: f.priority for f in self.features} + start, end = dt.last(minutes=6) +- time_series_list = [] ++ ts_list = [] + for metric in priorities.keys(): +- time_series = self.data_loader.get_metric( +- start, end, metric, label_name='machine_id', label_value=machine_id) +- time_series_list.extend(time_series) ++ _ts = self.data_loader.\ ++ get_metric(start, end, metric, label_name='machine_id', label_value=machine_id) ++ ts_list.extend(_ts) + + point_count = self.data_loader.expected_point_length(start, end) +- sr_model = SpectralResidual(12, 24, 50) ++ model = SpectralResidual(12, 24, 50) + + result = [] +- for time_series in time_series_list: +- if len(time_series.values) < point_count * 0.9 or \ +- len(time_series.values) > point_count * 1.5: ++ for _ts in ts_list: ++ if len(_ts.values) < point_count * 0.9 or \ ++ len(_ts.values) > point_count * 1.5: + continue + +- values = time_series.values ++ values = _ts.values + + if all(x == values[0] for x in values): + continue + +- scores = sr_model.compute_score(values) ++ scores = model.compute_score(values) + score = max(scores[-13:]) + + if math.isnan(score) or math.isinf(score): + continue + +- result.append((time_series, score)) ++ result.append((_ts, score)) + + result = sorted(result, key=lambda x: x[1], reverse=True)[0: top_n] + result = sorted(result, key=lambda x: priorities[x[0].metric]) + + return result + +- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): ++ def report(self, anomaly: Anomaly, machine_id: str): + description = {f.metric: f.description for f in self.features} + cause_metrics = self.detect_features(machine_id, top_n=3) + cause_metrics = [ +@@ -124,6 +131,5 @@ class SysTcpTransmissionDetector(Detector): + cause[0].labels.get('server_port', ''))} + for cause in cause_metrics] + timestamp = dt.utc_now() +- template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) +- template.labels = anomaly.labels ++ template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) + self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) +diff --git a/anteater/provider/base.py b/anteater/provider/base.py +index eda2d1b..35d1d1f 100644 +--- a/anteater/provider/base.py ++++ b/anteater/provider/base.py +@@ -56,16 +56,16 @@ class TimeSeriesProvider: + def fetch(url, params: Dict, **args) -> List: + """Fetches data from prometheus server by http request""" + try: +- response = requests.get(url, params, timeout=30, **args).json() ++ response = requests.get(url, params, timeout=30, **args) + except requests.RequestException as e: + logger.error(f"RequestException: {e}!") + return [] +- ++ response = response.json() + result = [] + if response and response.get("status") == 'success': + result = response.get('data', {}).get('result', []) + else: +- logger.error(f"PrometheusAdapter get data failed, " ++ logger.error(f"Prometheus get data failed, " + f"error: {response.get('error')}, query_url: {url}, params: {params}.") + + return result +@@ -87,17 +87,19 @@ class TimeSeriesProvider: + data = self.fetch(self.url, params, headers=headers) + + for item in data: +- zipped_values = list(zip(*item.get("values"))) ++ zipped_values = list(zip(*item.get('values'))) + time_stamps = list(zipped_values[0]) + values = [float(v) for v in zipped_values[1]] + +- key = tuple(sorted(item.get("metric").items())) ++ key = tuple(sorted(item.get('metric').items())) + if key in tmp_index: + result[tmp_index.get(key)].extend(time_stamps, values) + else: ++ labels = item.get('metric') ++ labels.pop('__name__', None) + time_series = TimeSeries( + metric, +- item.get("metric"), ++ labels, + time_stamps, + values) + tmp_index[key] = len(result) +diff --git a/anteater/source/anomaly_report.py b/anteater/source/anomaly_report.py +index 2205f44..41542b7 100644 +--- a/anteater/source/anomaly_report.py ++++ b/anteater/source/anomaly_report.py +@@ -48,6 +48,7 @@ class AnomalyReport: + entity_name = template.entity_name + labels = anomaly.labels + ++ template.labels = labels + template.entity_id = self.get_entity_id(machine_id, entity_name, labels, keys) + template.keys = keys + template.description = anomaly.description +diff --git a/anteater/template/template.py b/anteater/template/template.py +index 52befaa..d86a8cb 100644 +--- a/anteater/template/template.py ++++ b/anteater/template/template.py +@@ -21,8 +21,8 @@ class Template: + self.machine_id = machine_id + self.metric_id = metric_id + self.entity_name = entity_name +- self.labels = {} + ++ self.labels = {} + self.entity_id = "" + self.description = "" + self.cause_metrics = {} +diff --git a/anteater/utils/common.py b/anteater/utils/common.py +index d6c80ab..a99a1ef 100644 +--- a/anteater/utils/common.py ++++ b/anteater/utils/common.py +@@ -170,3 +170,15 @@ def divide(x, y): + return x / y + else: + return 0 ++ ++ ++def same_intersection_key_value(first: dict, second: dict): ++ """Checks there are same key value pairs between two dictionaries ++ intersections by the key ++ """ ++ same_keys = set(first.keys()) & set(second.keys()) ++ for key in same_keys: ++ if first[key] != second[key]: ++ return False ++ ++ return True +diff --git a/config/gala-anteater.yaml b/config/gala-anteater.yaml +index b771c16..a01eb47 100644 +--- a/config/gala-anteater.yaml ++++ b/config/gala-anteater.yaml +@@ -3,7 +3,7 @@ Global: + is_sys: false + + Kafka: +- server: "9.16.143.92" ++ server: "localhost" + port: "9092" + model_topic: "gala_anteater_hybrid_model" + model_group_id: "gala_anteater_1" +@@ -12,8 +12,8 @@ Kafka: + meta_entity_name: "sli" + + Prometheus: +- server: "10.137.17.122" +- port: "29090" ++ server: "localhost" ++ port: "9090" + steps: 5 + + Aom: +diff --git a/config/module/app_sli_rtt.json b/config/module/app_sli_rtt.json +index 4372f07..0744416 100644 +--- a/config/module/app_sli_rtt.json ++++ b/config/module/app_sli_rtt.json +@@ -6,9 +6,11 @@ + "entity_name": "sli", + "enable": true, + "description": "sli rtt 异常", +- "parameter": { +- "min_nsec": 200000000, +- "threshold": 0.8 ++ "params": { ++ "look_back": 10, ++ "box_pts": 3, ++ "obs_size": 25, ++ "outlier_ratio_th": 0.3 + } + }, + { +@@ -17,8 +19,11 @@ + "entity_name": "sli", + "enable": true, + "description": "sli tps 异常", +- "parameter": { +- "threshold": -0.5 ++ "params": { ++ "look_back": 10, ++ "box_pts": 3, ++ "obs_size": 25, ++ "outlier_ratio_th": 0.3 + } + } + ], +diff --git a/config/module/proc_io_latency.json b/config/module/proc_io_latency.json +index 05bcd81..967ee10 100644 +--- a/config/module/proc_io_latency.json ++++ b/config/module/proc_io_latency.json +@@ -6,8 +6,11 @@ + "entity_name": "thread", + "enable": true, + "description": "Process IO_wait time (unit: us)", +- "parameter": { +- "threshold": 0.5 ++ "params": { ++ "look_back": 10, ++ "box_pts": 3, ++ "obs_size": 25, ++ "outlier_ratio_th": 0.3 + } + }, + { +@@ -16,8 +19,11 @@ + "entity_name": "proc", + "enable": true, + "description": "I/O operation delay at the BIO layer (unit: us)", +- "parameter": { +- "threshold": 0.5 ++ "params": { ++ "look_back": 10, ++ "box_pts": 3, ++ "obs_size": 25, ++ "outlier_ratio_th": 0.3 + } + } + ], +diff --git a/config/module/sys_io_latency.json b/config/module/sys_io_latency.json +index 66e5c5a..f790192 100644 +--- a/config/module/sys_io_latency.json ++++ b/config/module/sys_io_latency.json +@@ -5,9 +5,12 @@ + "kpi_type": "", + "entity_name": "block", + "enable": true, +- "description": "Block I/O latency performance", +- "parameter": { +- "threshold": 0.5 ++ "description": "Block I/O latency performance is deteriorating!", ++ "params": { ++ "look_back": 20, ++ "box_pts": 7, ++ "obs_size": 25, ++ "outlier_ratio_th": 0.3 + } + } + ], +diff --git a/config/module/sys_tcp_establish.json b/config/module/sys_tcp_establish.json +index d695c2c..b6589ed 100644 +--- a/config/module/sys_tcp_establish.json ++++ b/config/module/sys_tcp_establish.json +@@ -6,7 +6,10 @@ + "entity_name": "tcp_link", + "enable": true, + "description": "RTT of syn packet(us): existing {:.0%} syn packets rtt are more than {:.0f} us", +- "parameter": {} ++ "params": { ++ "look_back": 30, ++ "outlier_ratio_th": 0.3 ++ } + } + ], + "Features": [ +diff --git a/config/module/sys_tcp_transmission.json b/config/module/sys_tcp_transmission.json +index 5347cc9..522056f 100644 +--- a/config/module/sys_tcp_transmission.json ++++ b/config/module/sys_tcp_transmission.json +@@ -6,8 +6,37 @@ + "entity_name": "tcp_link", + "enable": true, + "description": "Smoothed Round Trip Time(us)", +- "parameter": { +- "threshold": 0.5 ++ "params": { ++ "look_back": 10, ++ "box_pts": 3, ++ "obs_size": 25, ++ "outlier_ratio_th": 0.3 ++ } ++ }, ++ { ++ "metric": "gala_gopher_net_tcp_in_segs", ++ "kpi_type": "in_segs", ++ "entity_name": "tcp_link", ++ "enable": true, ++ "description": "Total number of segments received", ++ "params": { ++ "look_back": 10, ++ "box_pts": 3, ++ "obs_size": 25, ++ "outlier_ratio_th": 0.3 ++ } ++ }, ++ { ++ "metric": "gala_gopher_net_tcp_out_segs", ++ "kpi_type": "out_segs", ++ "entity_name": "tcp_link", ++ "enable": true, ++ "description": "Total number of segments sent", ++ "params": { ++ "look_back": 10, ++ "box_pts": 3, ++ "obs_size": 25, ++ "outlier_ratio_th": 0.3 + } + } + ], +-- +2.37.0.windows.1 +