diff --git a/add_metrics_anomaly_trends_indicator.patch b/add_metrics_anomaly_trends_indicator.patch deleted file mode 100644 index 882af81..0000000 --- a/add_metrics_anomaly_trends_indicator.patch +++ /dev/null @@ -1,284 +0,0 @@ -From ece4a0551bd81f64158ab465a865e31e97b63562 Mon Sep 17 00:00:00 2001 -From: lizhenxing11 -Date: Mon, 21 Nov 2022 14:54:20 +0800 -Subject: [PATCH 2/2] Add Metrics Anomaly Trends Indicator - -update config file ---- - anteater/core/feature.py | 8 +++++ - anteater/model/slope.py | 15 +++++++++ - anteater/module/app_sli_detector.py | 20 ++++++++--- - anteater/utils/data_load.py | 18 ++++++++-- - config/module/app_sli_rtt.json | 52 ++++++++++++++++++++--------- - 5 files changed, 90 insertions(+), 23 deletions(-) - -diff --git a/anteater/core/feature.py b/anteater/core/feature.py -index 306d835..6db764d 100644 ---- a/anteater/core/feature.py -+++ b/anteater/core/feature.py -@@ -12,6 +12,13 @@ - # ******************************************************************************/ - - from dataclasses import dataclass -+from enum import Enum -+ -+ -+class AnomalyTrend(Enum): -+ DEFAULT = 0 -+ RISE = 1 -+ FALL = 2 - - - @dataclass -@@ -19,3 +26,4 @@ class Feature: - metric: str - description: str - priority: int = 0 -+ atrend: AnomalyTrend = AnomalyTrend.DEFAULT -diff --git a/anteater/model/slope.py b/anteater/model/slope.py -index 422d6bc..08c4211 100644 ---- a/anteater/model/slope.py -+++ b/anteater/model/slope.py -@@ -29,3 +29,18 @@ def smooth_slope(time_series, windows_length): - val = conv_smooth(time_series.to_df(), box_pts=13) - val = slope(val, win_len=13) - return val[-windows_length:] -+ -+ -+def trend(y, win_len=None): -+ """Gets the trend for the y""" -+ if not win_len: -+ win_len = len(y) // 2 -+ -+ if np.mean(y[:win_len]) < np.mean(y[-win_len:]): -+ return 1 -+ -+ elif np.mean(y[:win_len]) > np.mean(y[-win_len:]): -+ return -1 -+ -+ else: -+ return 0 -diff --git a/anteater/module/app_sli_detector.py b/anteater/module/app_sli_detector.py -index b69f73c..b63f5e2 100644 ---- a/anteater/module/app_sli_detector.py -+++ b/anteater/module/app_sli_detector.py -@@ -20,7 +20,9 @@ import math - from typing import List - - from anteater.core.anomaly import Anomaly -+from anteater.core.feature import AnomalyTrend - from anteater.model.algorithms.spectral_residual import SpectralResidual -+from anteater.model.slope import trend - from anteater.model.smoother import conv_smooth - from anteater.model.three_sigma import three_sigma - from anteater.module.detector import Detector -@@ -134,10 +136,11 @@ class APPSliDetector(Detector): - - return anomalies - -- def detect_features(self, metrics, machine_id: str, top_n): -+ def detect_features(self, machine_id: str, top_n): -+ metric_atrend = {f.metric: f.atrend for f in self.features} - start, end = dt.last(minutes=6) - time_series_list = [] -- for metric in metrics: -+ for metric in metric_atrend.keys(): - time_series = self.data_loader.get_metric( - start, end, metric, label_name='machine_id', label_value=machine_id) - time_series_list.extend(time_series) -@@ -156,8 +159,16 @@ class APPSliDetector(Detector): - if all(x == values[0] for x in values): - continue - -+ if trend(time_series.values) < 0 and \ -+ metric_atrend[time_series.metric] == AnomalyTrend.RISE: -+ continue -+ -+ if trend(time_series.values) > 0 and \ -+ metric_atrend[time_series.metric] == AnomalyTrend.FALL: -+ continue -+ - scores = sr_model.compute_score(values) -- score = max(scores[-13:]) -+ score = max(scores[-25:]) - - if math.isnan(score) or math.isinf(score): - continue -@@ -170,9 +181,8 @@ class APPSliDetector(Detector): - - def report(self, anomaly: Anomaly, machine_id: str): - """Reports a single anomaly at each time""" -- feature_metrics = [f.metric for f in self.features] - description = {f.metric: f.description for f in self.features} -- cause_metrics = self.detect_features(feature_metrics, machine_id, top_n=60) -+ cause_metrics = self.detect_features(machine_id, top_n=60) - cause_metrics = [ - {'metric': cause[0].metric, - 'label': cause[0].labels, -diff --git a/anteater/utils/data_load.py b/anteater/utils/data_load.py -index 108d5ed..f8ce277 100644 ---- a/anteater/utils/data_load.py -+++ b/anteater/utils/data_load.py -@@ -17,7 +17,7 @@ from os import path, sep - from json import JSONDecodeError - from typing import List, Tuple - --from anteater.core.feature import Feature -+from anteater.core.feature import AnomalyTrend, Feature - from anteater.core.kpi import KPI - from anteater.utils.log import logger - -@@ -76,7 +76,21 @@ def load_kpi_feature(file_name) -> Tuple[List[KPI], List[Feature]]: - raise e - - kpis = [KPI(**param) for param in params.get('KPI')] -- features = [Feature(**param) for param in params.get('Features')] -+ -+ features = [] -+ for param in params.get('Features'): -+ parsed_param = {} -+ for key, value in param.items(): -+ if key == 'atrend': -+ if value.lower() == 'rise': -+ value = AnomalyTrend.RISE -+ elif value.lower() == 'fall': -+ value = AnomalyTrend.FALL -+ else: -+ value = AnomalyTrend.DEFAULT -+ parsed_param[key] = value -+ -+ features.append(Feature(**parsed_param)) - - if duplicated_metric([kpi.metric for kpi in kpis]) or \ - duplicated_metric([f.metric for f in features]): -diff --git a/config/module/app_sli_rtt.json b/config/module/app_sli_rtt.json -index 0744416..b7f78b7 100644 ---- a/config/module/app_sli_rtt.json -+++ b/config/module/app_sli_rtt.json -@@ -34,19 +34,23 @@ - }, - { - "metric": "gala_gopher_block_latency_req_jitter", -- "description": "block层request时延抖动异常" -+ "description": "block层request时延抖动异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_block_latency_req_last", -- "description": "block层request时延最近值异常" -+ "description": "block层request时延最近值异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_block_latency_req_max", -- "description": "block层request时延最大值异常" -+ "description": "block层request时延最大值异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_block_latency_req_sum", -- "description": "block层request时延总计值异常" -+ "description": "block层request时延总计值异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_cpu_iowait_total_second", -@@ -54,11 +58,13 @@ - }, - { - "metric": "gala_gopher_cpu_user_total_second", -- "description": "用户态cpu占用时间(不包括nice)异常" -+ "description": "用户态cpu占用时间(不包括nice)异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_cpu_total_used_per", -- "description": "CPU总利用率异常" -+ "description": "CPU总利用率异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_cpu_backlog_drops", -@@ -86,7 +92,8 @@ - }, - { - "metric": "gala_gopher_disk_r_await", -- "description": "读响应时间异常" -+ "description": "读响应时间异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_disk_rareq", -@@ -94,19 +101,23 @@ - }, - { - "metric": "gala_gopher_disk_rspeed", -- "description": "读速率(IOPS)异常" -+ "description": "读速率(IOPS)异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_disk_rspeed_kB", -- "description": "读吞吐量异常" -+ "description": "读吞吐量异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_disk_util", -- "description": "磁盘使用率异常" -+ "description": "磁盘使用率异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_disk_w_await", -- "description": "写响应时间异常" -+ "description": "写响应时间异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_disk_wareq", -@@ -114,19 +125,23 @@ - }, - { - "metric": "gala_gopher_disk_wspeed", -- "description": "写速率(IOPS)异常" -+ "description": "写速率(IOPS)异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_disk_wspeed_kB", -- "description": "写吞吐量异常" -+ "description": "写吞吐量异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_proc_read_bytes", -- "description": "进程实际从磁盘读取的字节数异常" -+ "description": "进程实际从磁盘读取的字节数异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_proc_write_bytes", -- "description": "进程实际从磁盘写入的字节数异常" -+ "description": "进程实际从磁盘写入的字节数异常", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_net_tcp_retrans_segs", -@@ -134,7 +149,12 @@ - }, - { - "metric": "gala_gopher_tcp_link_lost_out", -- "description": "TPC丢包数异常" -+ "description": "TCP丢包数异常" -+ }, -+ { -+ "metric": "gala_gopher_tcp_link_srtt", -+ "description": "TCP超时", -+ "atrend": "rise" - }, - { - "metric": "gala_gopher_tcp_link_notack_bytes", --- -2.37.0.windows.1 - diff --git a/gala-anteater-1.0.0.tar.gz b/gala-anteater-1.0.0.tar.gz deleted file mode 100644 index 05ad264..0000000 Binary files a/gala-anteater-1.0.0.tar.gz and /dev/null differ diff --git a/gala-anteater-1.0.1.tar.gz b/gala-anteater-1.0.1.tar.gz new file mode 100644 index 0000000..b309b88 Binary files /dev/null and b/gala-anteater-1.0.1.tar.gz differ diff --git a/gala-anteater.spec b/gala-anteater.spec index 64f4e48..a6d7685 100644 --- a/gala-anteater.spec +++ b/gala-anteater.spec @@ -1,8 +1,8 @@ %define debug_package %{nil} Name: gala-anteater -Version: 1.0.0 -Release: 2 +Version: 1.0.1 +Release: 1 Summary: A time-series anomaly detection platform for operating system. License: MulanPSL2 URL: https://gitee.com/openeuler/gala-anteater @@ -11,9 +11,6 @@ BuildRoot: %{_builddir}/%{name}-%{version} BuildRequires: procps-ng python3-setuptools Requires: python3-gala-anteater = %{version}-%{release} -patch0: update_sys_io_latency_detector_model.patch -patch1: add_metrics_anomaly_trends_indicator.patch - %description Abnormal detection module for A-Ops project @@ -50,7 +47,8 @@ Python3 package of gala-anteater %config(noreplace) %{_sysconfdir}/%{name}/config/module/proc_io_latency.json %config(noreplace) %{_sysconfdir}/%{name}/config/module/sys_io_latency.json %config(noreplace) %{_sysconfdir}/%{name}/config/module/sys_tcp_establish.json -%config(noreplace) %{_sysconfdir}/%{name}/config/module/sys_tcp_transmission.json +%config(noreplace) %{_sysconfdir}/%{name}/config/module/sys_tcp_transmission_latency.json +%config(noreplace) %{_sysconfdir}/%{name}/config/module/sys_tcp_transmission_throughput.json %files -n python3-gala-anteater @@ -59,6 +57,9 @@ Python3 package of gala-anteater %changelog +* Wed Nov 30 2022 Li Zhenxing - 1.0.1-1 +- Add sys level anomaly detection and cause inference + * Tue Nov 22 2022 Li Zhenxing - 1.0.0-2 - Updates anomaly detection model and imporves cause inference result diff --git a/update_sys_io_latency_detector_model.patch b/update_sys_io_latency_detector_model.patch deleted file mode 100644 index 09dd280..0000000 --- a/update_sys_io_latency_detector_model.patch +++ /dev/null @@ -1,1215 +0,0 @@ -From 0b2243b61fe5083784e634db0d97c2888330eb0a Mon Sep 17 00:00:00 2001 -From: lizhenxing11 -Date: Mon, 14 Nov 2022 19:43:17 +0800 -Subject: [PATCH 1/2] Update sys io latency detector model - -Add tcp detector and update model - -fix code check issue ---- - anteater/core/anomaly.py | 8 ++ - anteater/core/kpi.py | 2 +- - anteater/model/three_sigma.py | 37 ++++++ - anteater/module/app_sli_detector.py | 93 ++++++++------- - anteater/module/detector.py | 54 +++++++-- - anteater/module/proc_io_latency_detector.py | 49 +++++--- - anteater/module/sys_io_latency_detector.py | 109 +++++++++--------- - anteater/module/sys_tcp_establish_detector.py | 108 +++++++++-------- - .../module/sys_tcp_transmission_detector.py | 76 ++++++------ - anteater/provider/base.py | 14 ++- - anteater/source/anomaly_report.py | 1 + - anteater/template/template.py | 2 +- - anteater/utils/common.py | 12 ++ - config/gala-anteater.yaml | 6 +- - config/module/app_sli_rtt.json | 15 ++- - config/module/proc_io_latency.json | 14 ++- - config/module/sys_io_latency.json | 9 +- - config/module/sys_tcp_establish.json | 5 +- - config/module/sys_tcp_transmission.json | 33 +++++- - 19 files changed, 416 insertions(+), 231 deletions(-) - create mode 100644 anteater/model/three_sigma.py - -diff --git a/anteater/core/anomaly.py b/anteater/core/anomaly.py -index cce7767..b95eeab 100644 ---- a/anteater/core/anomaly.py -+++ b/anteater/core/anomaly.py -@@ -13,6 +13,8 @@ - - from dataclasses import dataclass - -+from anteater.utils.time_series import TimeSeries -+ - - @dataclass - class Anomaly: -@@ -21,3 +23,9 @@ class Anomaly: - score: float = None - entity_name: str = None - description: str = None -+ -+ -+@dataclass -+class CauseMetric: -+ ts: TimeSeries -+ score: float -diff --git a/anteater/core/kpi.py b/anteater/core/kpi.py -index db4c046..620ffdf 100644 ---- a/anteater/core/kpi.py -+++ b/anteater/core/kpi.py -@@ -21,5 +21,5 @@ class KPI: - entity_name: str = None - enable: bool = False - description: str = "" -- parameter: dict = field(default=dict) -+ params: dict = field(default=dict) - -diff --git a/anteater/model/three_sigma.py b/anteater/model/three_sigma.py -new file mode 100644 -index 0000000..08d05ba ---- /dev/null -+++ b/anteater/model/three_sigma.py -@@ -0,0 +1,37 @@ -+#!/usr/bin/python3 -+# ****************************************************************************** -+# Copyright (c) 2022 Huawei Technologies Co., Ltd. -+# gala-anteater is licensed under Mulan PSL v2. -+# You can use this software according to the terms and conditions of the Mulan PSL v2. -+# You may obtain a copy of Mulan PSL v2 at: -+# http://license.coscl.org.cn/MulanPSL2 -+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -+# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -+# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -+# See the Mulan PSL v2 for more details. -+# ******************************************************************************/ -+ -+ -+import numpy as np -+ -+ -+def three_sigma(values, obs_size, method="abs"): -+ """The '3-sigma rule' outlier detect function""" -+ if obs_size <= 0: -+ raise ValueError("The obs_size should great than zero!") -+ if len(values) <= obs_size: -+ raise ValueError("The obs_size should be great than values' length") -+ train_val = values[:-obs_size] -+ obs_val = values[-obs_size:] -+ mean = np.mean(train_val) -+ std = np.std(train_val) -+ if method == "abs": -+ outlier = [val for val in obs_val if abs(val - mean) > 3 * std] -+ elif method == 'min': -+ outlier = [val for val in obs_val if val < mean - 3 * std] -+ elif method == 'max': -+ outlier = [val for val in obs_val if val > mean + 3 * std] -+ else: -+ raise ValueError(f'Unknown method {method}') -+ -+ return outlier, mean, std -diff --git a/anteater/module/app_sli_detector.py b/anteater/module/app_sli_detector.py -index e38d53e..b69f73c 100644 ---- a/anteater/module/app_sli_detector.py -+++ b/anteater/module/app_sli_detector.py -@@ -19,17 +19,15 @@ Description: The anomaly detector implementation on APP Sli - import math - from typing import List - --import numpy as np -- - from anteater.core.anomaly import Anomaly - from anteater.model.algorithms.spectral_residual import SpectralResidual --from anteater.model.slope import smooth_slope - from anteater.model.smoother import conv_smooth -+from anteater.model.three_sigma import three_sigma - from anteater.module.detector import Detector - from anteater.source.anomaly_report import AnomalyReport - from anteater.source.metric_loader import MetricLoader - from anteater.template.app_anomaly_template import AppAnomalyTemplate --from anteater.utils.data_load import load_kpi_feature -+from anteater.utils.common import divide - from anteater.utils.datetime import DateTimeManager as dt - from anteater.utils.log import logger - -@@ -40,48 +38,52 @@ class APPSliDetector(Detector): - """ - - def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): -- super().__init__(data_loader, anomaly_report) -- self.kpis, self.features = load_kpi_feature('app_sli_rtt.json') -+ file_name = 'app_sli_rtt.json' -+ super().__init__(data_loader, anomaly_report, file_name) - - def execute_detect(self, machine_id: str): - for kpi in self.kpis: -- parameter = kpi.parameter - if kpi.kpi_type == 'rtt': -- anomalies = self.detect_rtt(kpi, machine_id, parameter) -+ anomalies = self.detect_rtt(kpi, machine_id) - else: -- anomalies = self.detect_tps(kpi, machine_id, parameter) -+ anomalies = self.detect_tps(kpi, machine_id) - - for anomaly in anomalies: -- self.report(anomaly, kpi.entity_name, machine_id) -+ self.report(anomaly, machine_id) - -- def detect_rtt(self, kpi, machine_id: str, parameter: dict) -> List[Anomaly]: -+ def detect_rtt(self, kpi, machine_id: str) -> List[Anomaly]: - """Detects rtt by rule-based model""" -- start, end = dt.last(minutes=10) -- time_series_list = self.data_loader.get_metric( -+ look_back = kpi.params.get('look_back', None) -+ box_pts = kpi.params.get('box_pts', None) -+ obs_size = kpi.params.get('obs_size', None) -+ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) -+ -+ start, end = dt.last(minutes=look_back) -+ ts_list = self.data_loader.get_metric( - start, end, kpi.metric, label_name='machine_id', label_value=machine_id) - -- if not time_series_list: -+ if not ts_list: - logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') - return [] - - point_count = self.data_loader.expected_point_length(start, end) - anomalies = [] -- threshold = parameter['threshold'] -- min_nsec = parameter['min_nsec'] -- for time_series in time_series_list: -- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: -+ for _ts in ts_list: -+ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: - continue - -- score = max(smooth_slope(time_series, windows_length=13)) -- if math.isnan(score) or math.isinf(score): -- continue -+ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) -+ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') -+ ratio = divide(len(outlier), obs_size) - -- avg_nsec = np.mean(time_series.values[-13:]) -- if score >= threshold and avg_nsec >= min_nsec: -+ if outlier and ratio >= outlier_ratio_th: -+ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' -+ f'Mean: {mean}, Std: {std}') - anomalies.append( -- Anomaly(metric=time_series.metric, -- labels=time_series.labels, -- score=score, -+ Anomaly(metric=_ts.metric, -+ labels=_ts.labels, -+ score=ratio, -+ entity_name=kpi.entity_name, - description=kpi.description)) - - anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) -@@ -91,9 +93,14 @@ class APPSliDetector(Detector): - - return anomalies - -- def detect_tps(self, kpi, machine_id: str, parameter: dict) -> List[Anomaly]: -+ def detect_tps(self, kpi, machine_id: str) -> List[Anomaly]: - """Detects tps by rule based model""" -- start, end = dt.last(minutes=10) -+ look_back = kpi.params.get('look_back', None) -+ box_pts = kpi.params.get('box_pts', None) -+ obs_size = kpi.params.get('obs_size', None) -+ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) -+ -+ start, end = dt.last(minutes=look_back) - time_series_list = self.data_loader.get_metric( - start, end, kpi.metric, label_name='machine_id', label_value=machine_id) - -@@ -103,21 +110,21 @@ class APPSliDetector(Detector): - - point_count = self.data_loader.expected_point_length(start, end) - anomalies = [] -- threshold = parameter['threshold'] -- for time_series in time_series_list: -- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: -+ for _ts in time_series_list: -+ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: - continue -- pre_values = time_series.values[:-25] -- cur_values = time_series.values[-25:] -- mean = np.mean(pre_values) -- std = np.std(pre_values) -- outlier = [val for val in cur_values if val < mean - 3 * std] -+ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) -+ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') -+ ratio = divide(len(outlier), obs_size) - -- if outlier and len(outlier) >= len(cur_values) * 0.3: -+ if outlier and ratio >= outlier_ratio_th: -+ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' -+ f'Mean: {mean}, Std: {std}') - anomalies.append( -- Anomaly(metric=time_series.metric, -- labels=time_series.labels, -- score=1, -+ Anomaly(metric=_ts.metric, -+ labels=_ts.labels, -+ score=ratio, -+ entity_name=kpi.entity_name, - description=kpi.description)) - - anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) -@@ -161,7 +168,8 @@ class APPSliDetector(Detector): - - return result[0: top_n] - -- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): -+ def report(self, anomaly: Anomaly, machine_id: str): -+ """Reports a single anomaly at each time""" - feature_metrics = [f.metric for f in self.features] - description = {f.metric: f.description for f in self.features} - cause_metrics = self.detect_features(feature_metrics, machine_id, top_n=60) -@@ -172,6 +180,5 @@ class APPSliDetector(Detector): - 'description': description.get(cause[0].metric, '')} - for cause in cause_metrics] - timestamp = dt.utc_now() -- template = AppAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) -- template.labels = anomaly.labels -+ template = AppAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) - self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) -diff --git a/anteater/module/detector.py b/anteater/module/detector.py -index 51dabbd..bfe516e 100644 ---- a/anteater/module/detector.py -+++ b/anteater/module/detector.py -@@ -11,37 +11,69 @@ - # See the Mulan PSL v2 for more details. - # ******************************************************************************/ - --from abc import abstractmethod -+from abc import abstractmethod, ABC - - from anteater.core.anomaly import Anomaly - from anteater.source.anomaly_report import AnomalyReport - from anteater.source.metric_loader import MetricLoader -+from anteater.utils.common import same_intersection_key_value -+from anteater.utils.data_load import load_kpi_feature - from anteater.utils.datetime import DateTimeManager as dt - from anteater.utils.log import logger - from anteater.utils.timer import timer - - --class Detector: -- """The base detector class""" -- def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): -+class Detector(ABC): -+ """The anomaly detector base class""" -+ def __init__( -+ self, -+ data_loader: MetricLoader, -+ anomaly_report: AnomalyReport, -+ file_name: str): -+ """The detector base class initializer""" - self.data_loader = data_loader - self.anomaly_report = anomaly_report -+ self.kpis, self.features = load_kpi_feature(file_name) -+ -+ @staticmethod -+ def filter_ts(ts_list, filters): -+ result = [] -+ for _ts in ts_list: -+ if same_intersection_key_value(_ts.labels, filters): -+ result.append(_ts) -+ -+ return result - - @abstractmethod - def execute_detect(self, machine_id): -+ """Executes anomaly detection on specified machine id""" -+ pass -+ -+ @abstractmethod -+ def report(self, anomaly: Anomaly, machine_id: str): -+ """Reports a single anomaly at each time""" - pass - - @timer - def detect(self): -+ """The main function of detector""" -+ if not self.kpis: -+ logger.debug(f"Null kpis in detector: {self.__class__.__name__}!") -+ return -+ - logger.info(f"Run detector: {self.__class__.__name__}!") -- start, end = dt.last(minutes=1) -- metrics_kpi = [k.metric for k in self.kpis] -- metrics_feat = [f.metric for f in self.features] -- metrics = metrics_kpi + metrics_feat -- machine_ids = self.data_loader.get_unique_machines(start, end, metrics) -+ self.pre_process() -+ machine_ids = self.get_unique_machine_id() - for _id in machine_ids: - self.execute_detect(_id) - -- @abstractmethod -- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): -+ def pre_process(self): -+ """Executes pre-process for generating necessary parameters""" - pass -+ -+ def get_unique_machine_id(self): -+ """Gets unique machine ids during past minutes""" -+ start, end = dt.last(minutes=1) -+ metrics = [_kpi.metric for _kpi in self.kpis] -+ machine_ids = self.data_loader.get_unique_machines(start, end, metrics) -+ return machine_ids -diff --git a/anteater/module/proc_io_latency_detector.py b/anteater/module/proc_io_latency_detector.py -index ee1d7c6..3ea2c51 100644 ---- a/anteater/module/proc_io_latency_detector.py -+++ b/anteater/module/proc_io_latency_detector.py -@@ -16,11 +16,13 @@ import math - from anteater.core.anomaly import Anomaly - from anteater.model.algorithms.spectral_residual import SpectralResidual - from anteater.model.slope import smooth_slope -+from anteater.model.smoother import conv_smooth -+from anteater.model.three_sigma import three_sigma - from anteater.module.detector import Detector - from anteater.source.anomaly_report import AnomalyReport - from anteater.source.metric_loader import MetricLoader - from anteater.template.sys_anomaly_template import SysAnomalyTemplate --from anteater.utils.data_load import load_kpi_feature -+from anteater.utils.common import divide - from anteater.utils.datetime import DateTimeManager as dt - from anteater.utils.log import logger - -@@ -31,40 +33,50 @@ class ProcIOLatencyDetector(Detector): - """ - - def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): -- super().__init__(data_loader, anomaly_report) -- self.kpis, self.features = load_kpi_feature('proc_io_latency.json') -+ file_name = 'proc_io_latency.json' -+ super().__init__(data_loader, anomaly_report, file_name) - - def execute_detect(self, machine_id): - for kpi in self.kpis: -- parameter = kpi.parameter -- start, end = dt.last(minutes=10) -- time_series_list = self.data_loader.get_metric( -+ look_back = kpi.params.get('look_back', None) -+ box_pts = kpi.params.get('box_pts', None) -+ obs_size = kpi.params.get('obs_size', None) -+ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) -+ -+ start, end = dt.last(minutes=look_back) -+ ts_list = self.data_loader.get_metric( - start, end, kpi.metric, label_name='machine_id', label_value=machine_id) - -- if not time_series_list: -+ if not ts_list: - logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') - return - - point_count = self.data_loader.expected_point_length(start, end) - anomalies = [] -- threshold = parameter['threshold'] -- for time_series in time_series_list: -- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: -+ for _ts in ts_list: -+ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: - continue - -- if sum(time_series.values) == 0: -+ if sum(_ts.values) == 0: - continue - -- score = max(smooth_slope(time_series, windows_length=13)) -+ score = max(smooth_slope(_ts, windows_length=13)) - - if math.isnan(score) or math.isinf(score): - continue - -- if score > threshold: -+ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) -+ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') -+ ratio = divide(len(outlier), obs_size) -+ -+ if outlier and ratio >= outlier_ratio_th: -+ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' -+ f'Mean: {mean}, Std: {std}') - anomalies.append( -- Anomaly(metric=time_series.metric, -- labels=time_series.labels, -+ Anomaly(metric=_ts.metric, -+ labels=_ts.labels, - score=score, -+ entity_name=kpi.entity_name, - description=kpi.description)) - - anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) -@@ -72,7 +84,7 @@ class ProcIOLatencyDetector(Detector): - if anomalies: - logger.info('Sys io latency anomalies was detected.') - for anomaly in anomalies: -- self.report(anomaly, kpi.entity_name, machine_id) -+ self.report(anomaly, machine_id) - - def detect_features(self, machine_id: str, top_n=3): - priorities = {f.metric: f.priority for f in self.features} -@@ -110,7 +122,7 @@ class ProcIOLatencyDetector(Detector): - - return result - -- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): -+ def report(self, anomaly: Anomaly, machine_id: str): - description = {f.metric: f.description for f in self.features} - cause_metrics = self.detect_features(machine_id, top_n=3) - cause_metrics = [ -@@ -123,6 +135,5 @@ class ProcIOLatencyDetector(Detector): - cause[0].labels.get('comm', ''))} - for cause in cause_metrics] - timestamp = dt.utc_now() -- template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) -- template.labels = anomaly.labels -+ template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) - self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) -diff --git a/anteater/module/sys_io_latency_detector.py b/anteater/module/sys_io_latency_detector.py -index f459a57..c29ce72 100644 ---- a/anteater/module/sys_io_latency_detector.py -+++ b/anteater/module/sys_io_latency_detector.py -@@ -15,12 +15,13 @@ import math - - from anteater.core.anomaly import Anomaly - from anteater.model.algorithms.spectral_residual import SpectralResidual --from anteater.model.slope import smooth_slope -+from anteater.model.smoother import conv_smooth -+from anteater.model.three_sigma import three_sigma - from anteater.module.detector import Detector - from anteater.source.anomaly_report import AnomalyReport - from anteater.source.metric_loader import MetricLoader - from anteater.template.sys_anomaly_template import SysAnomalyTemplate --from anteater.utils.data_load import load_kpi_feature -+from anteater.utils.common import divide, same_intersection_key_value - from anteater.utils.datetime import DateTimeManager as dt - from anteater.utils.log import logger - -@@ -31,100 +32,104 @@ class SysIOLatencyDetector(Detector): - """ - - def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): -- super().__init__(data_loader, anomaly_report) -- self.kpis, self.features = load_kpi_feature('sys_io_latency.json') -+ """The system i/o latency detector initializer""" -+ file_name = 'sys_io_latency.json' -+ super().__init__(data_loader, anomaly_report, file_name) - - def execute_detect(self, machine_id: str): -+ """Executes the detector based on machine id""" - kpi = self.kpis[0] -- parameter = kpi.parameter -+ look_back = kpi.params.get('look_back', None) -+ box_pts = kpi.params.get('box_pts', None) -+ obs_size = kpi.params.get('obs_size', None) -+ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) - -- start, end = dt.last(minutes=10) -- time_series_list = self.data_loader.get_metric( -- start, end, kpi.metric, label_name='machine_id', label_value=machine_id) -+ start, end = dt.last(minutes=look_back) -+ ts_list = self.data_loader.\ -+ get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) - -- if not time_series_list: -+ if not ts_list: - logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') - return - - point_count = self.data_loader.expected_point_length(start, end) - anomalies = [] -- threshold = parameter['threshold'] -- for time_series in time_series_list: -- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: -+ for _ts in ts_list: -+ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: - continue - -- if sum(time_series.values) == 0: -+ if sum(_ts.values) == 0: - continue - -- score = max(smooth_slope(time_series, windows_length=13)) -+ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) -+ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='max') -+ ratio = divide(len(outlier), obs_size) - -- if math.isnan(score) or math.isinf(score): -- continue -- -- if score > threshold: -+ if outlier and ratio >= outlier_ratio_th: -+ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' -+ f'Mean: {mean}, Std: {std}') - anomalies.append( -- Anomaly(metric=time_series.metric, -- labels=time_series.labels, -- score=score, -+ Anomaly(metric=_ts.metric, -+ labels=_ts.labels, -+ score=ratio, -+ entity_name=kpi.entity_name, - description=kpi.description)) - -- anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) -- - if anomalies: - logger.info('Sys io latency anomalies was detected.') -+ anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) - for anomaly in anomalies: -- self.report(anomaly, kpi.entity_name, machine_id) -+ self.report(anomaly, machine_id) - -- def detect_features(self, machine_id: str, top_n: int): -+ def find_cause_metrics(self, machine_id: str, filters: dict, top_n: int): -+ """Detects the abnormal features and reports the caused metrics""" - priorities = {f.metric: f.priority for f in self.features} - start, end = dt.last(minutes=6) -- time_series_list = [] -+ ts_list = [] - for metric in priorities.keys(): -- time_series = self.data_loader.get_metric( -+ _ts_list = self.data_loader.get_metric( - start, end, metric, label_name='machine_id', label_value=machine_id) -- time_series_list.extend(time_series) -+ for _ts in _ts_list: -+ if same_intersection_key_value(_ts.labels, filters): -+ ts_list.append(_ts) - - point_count = self.data_loader.expected_point_length(start, end) -- sr_model = SpectralResidual(12, 24, 50) -- -+ model = SpectralResidual(12, 24, 50) - result = [] -- for time_series in time_series_list: -- if len(time_series.values) < point_count * 0.9 or \ -- len(time_series.values) > point_count * 1.5: -+ for _ts in ts_list: -+ if len(_ts.values) < point_count * 0.9 or \ -+ len(_ts.values) > point_count * 1.5: - continue - -- values = time_series.values -- -- if all(x == values[0] for x in values): -+ if all(x == _ts.values[0] for x in _ts.values): - continue - -- scores = sr_model.compute_score(values) -- score = max(scores[-13:]) -- -+ score = max(model.compute_score(_ts.values)[-13:]) - if math.isnan(score) or math.isinf(score): - continue - -- result.append((time_series, score)) -+ result.append((_ts, score)) - - result = sorted(result, key=lambda x: x[1], reverse=True)[0: top_n] - result = sorted(result, key=lambda x: priorities[x[0].metric]) - - return result - -- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): -- feature_metrics = [f.metric for f in self.features] -+ def report(self, anomaly: Anomaly, machine_id: str): -+ """Reports the anomaly with it's caused metrics""" - description = {f.metric: f.description for f in self.features} -- cause_metrics = self.detect_features(machine_id, top_n=3) -+ cause_metrics = self.find_cause_metrics(machine_id, anomaly.labels, top_n=3) - cause_metrics = [ -- {'metric': cause[0].metric, -- 'label': cause[0].labels, -- 'score': cause[1], -- 'description': description.get(cause[0].metric, '').format( -- cause[0].labels.get('disk_name', ''), -- cause[0].labels.get('tgid', ''), -- cause[0].labels.get('comm', ''))} -+ { -+ 'metric': cause[0].metric, -+ 'label': cause[0].labels, -+ 'score': cause[1], -+ 'description': description.get(cause[0].metric, '').format( -+ cause[0].labels.get('disk_name', ''), -+ cause[0].labels.get('tgid', ''), -+ cause[0].labels.get('comm', '')) -+ } - for cause in cause_metrics] - timestamp = dt.utc_now() -- template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) -- template.labels = anomaly.labels -+ template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) - self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) -diff --git a/anteater/module/sys_tcp_establish_detector.py b/anteater/module/sys_tcp_establish_detector.py -index 4b49b25..3ca61c5 100644 ---- a/anteater/module/sys_tcp_establish_detector.py -+++ b/anteater/module/sys_tcp_establish_detector.py -@@ -11,15 +11,17 @@ - # See the Mulan PSL v2 for more details. - # ******************************************************************************/ - -+from functools import reduce -+from typing import List -+ - import numpy as np - --from anteater.core.anomaly import Anomaly -+from anteater.core.anomaly import Anomaly, CauseMetric - from anteater.module.detector import Detector - from anteater.source.anomaly_report import AnomalyReport - from anteater.source.metric_loader import MetricLoader - from anteater.template.sys_anomaly_template import SysAnomalyTemplate --from anteater.utils.common import divide --from anteater.utils.data_load import load_kpi_feature -+from anteater.utils.common import divide, same_intersection_key_value - from anteater.utils.datetime import DateTimeManager as dt - from anteater.utils.log import logger - -@@ -36,69 +38,85 @@ class SysTcpEstablishDetector(Detector): - """ - - def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): -- super().__init__(data_loader, anomaly_report) -- self.kpis, self.features = load_kpi_feature('sys_tcp_establish.json') -+ file_name = 'sys_tcp_establish.json' -+ super().__init__(data_loader, anomaly_report, file_name) - -- def execute_detect(self, machine_id: str): -+ self.mean = None -+ self.std = None -+ -+ def pre_process(self): -+ """Calculates ts values mean and std""" - kpi = self.kpis[0] -- start_30_minutes, _ = dt.last(minutes=30) -- start_3_minutes, end = dt.last(minutes=3) -+ look_back = kpi.params.get('look_back', None) - -- pre_ts = self.data_loader.get_metric( -- start_30_minutes, start_3_minutes, kpi.metric, label_name='machine_id', label_value=machine_id) -- pre_establish_time = [t.values[0] for t in pre_ts if t.values] -+ start, _ = dt.last(minutes=look_back) -+ mid, _ = dt.last(minutes=3) - -- ts = self.data_loader.get_metric( -- start_3_minutes, end, kpi.metric, label_name='machine_id', label_value=machine_id) -- establish_time = [t.values[0] for t in ts if t.values] -+ ts_list = self.data_loader.get_metric(start, mid, kpi.metric) -+ establish_time = reduce(lambda x, y: x + y, [list(set(_ts.values)) for _ts in ts_list]) - -- mean = np.mean(pre_establish_time) -- std = np.std(pre_establish_time) -+ self.mean = np.mean(establish_time) -+ self.std = np.std(establish_time) - -- outlier = [val for val in establish_time if abs(val - mean) > 3 * std] -+ def execute_detect(self, machine_id: str): -+ """Executes the detector based on machine id""" -+ kpi = self.kpis[0] -+ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) - -- if outlier and len(outlier) > len(ts) * 0.3: -+ start, end = dt.last(minutes=3) -+ ts_list = self.data_loader. \ -+ get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) -+ establish_time = reduce(lambda x, y: x + y, [list(set(_ts.values)) for _ts in ts_list]) -+ -+ outlier = [val for val in establish_time if abs(val - self.mean) > 3 * self.std] -+ ratio = divide(len(outlier), len(establish_time)) -+ if outlier and ratio > outlier_ratio_th: -+ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' -+ f'Mean: {self.mean}, Std: {self.std}') - logger.info('Sys tcp establish anomalies was detected.') -- if establish_time: -- percentile = divide(len(outlier), len(establish_time)) -- else: -- percentile = 0 - anomaly = Anomaly( - metric=kpi.metric, - labels={}, -- description=kpi.description.format(percentile, min(outlier))) -- self.report(anomaly, kpi.entity_name, machine_id) -+ entity_name=kpi.entity_name, -+ description=kpi.description.format(ratio, min(outlier))) -+ self.report(anomaly, machine_id) - -- def detect_features(self, machine_id: str): -+ def find_cause_metrics(self, machine_id: str, filters: dict) -> List[CauseMetric]: -+ """Detects the abnormal features and reports the caused metrics""" -+ priorities = {f.metric: f.priority for f in self.features} - start, end = dt.last(minutes=3) -- time_series_list = [] -- metrics = [f.metric for f in self.features] -- for metric in metrics: -- time_series = self.data_loader.get_metric( -- start, end, metric, label_name='machine_id', label_value=machine_id) -- time_series_list.extend(time_series) -+ ts_list = [] -+ for metric in priorities.keys(): -+ _ts_list = self.data_loader.\ -+ get_metric(start, end, metric, label_name='machine_id', label_value=machine_id) -+ filtered_ts_list = self.filter_ts(_ts_list, filters) -+ ts_list.extend(filtered_ts_list) - - result = [] -- for ts in time_series_list: -- if ts.values and max(ts.values) > 0: -- result.append((ts, max(ts.values))) -+ for _ts in ts_list: -+ if _ts.values and max(_ts.values) > 0: -+ cause_metric = CauseMetric(ts=_ts, score=max(_ts.values)) -+ result.append(cause_metric) - -- result = sorted(result, key=lambda x: x[1], reverse=True) -+ result = sorted(result, key=lambda x: x.score, reverse=True) - - return result - -- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): -+ def report(self, anomaly: Anomaly, machine_id: str): -+ """Reports a single anomaly at each time""" - description = {f.metric: f.description for f in self.features} -- cause_metrics = self.detect_features(machine_id) -+ cause_metrics = self.find_cause_metrics(machine_id, anomaly.labels) - cause_metrics = [ -- {'metric': cause[0].metric, -- 'label': cause[0].labels, -- 'score': cause[1], -- 'description': description.get(cause[0].metric, '').format( -- cause[0].labels.get('ppid', ''), -- cause[0].labels.get('s_port', ''))} -+ { -+ 'metric': cause.ts.metric, -+ 'label': cause.ts.labels, -+ 'score': cause.score, -+ 'description': description.get(cause.ts.metric, '').format( -+ cause.ts.labels.get('ppid', ''), -+ cause.ts.labels.get('s_port', '')) -+ } - for cause in cause_metrics] -+ - timestamp = dt.utc_now() -- template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) -- template.labels = anomaly.labels -+ template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) - self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) -diff --git a/anteater/module/sys_tcp_transmission_detector.py b/anteater/module/sys_tcp_transmission_detector.py -index 9af4760..ad383b4 100644 ---- a/anteater/module/sys_tcp_transmission_detector.py -+++ b/anteater/module/sys_tcp_transmission_detector.py -@@ -16,11 +16,13 @@ import math - from anteater.core.anomaly import Anomaly - from anteater.model.algorithms.spectral_residual import SpectralResidual - from anteater.model.slope import smooth_slope -+from anteater.model.smoother import conv_smooth -+from anteater.model.three_sigma import three_sigma - from anteater.module.detector import Detector - from anteater.source.anomaly_report import AnomalyReport - from anteater.source.metric_loader import MetricLoader - from anteater.template.sys_anomaly_template import SysAnomalyTemplate --from anteater.utils.data_load import load_kpi_feature -+from anteater.utils.common import divide - from anteater.utils.datetime import DateTimeManager as dt - from anteater.utils.log import logger - -@@ -31,40 +33,45 @@ class SysTcpTransmissionDetector(Detector): - """ - - def __init__(self, data_loader: MetricLoader, anomaly_report: AnomalyReport): -- super().__init__(data_loader, anomaly_report) -- self.kpis, self.features = load_kpi_feature('sys_tcp_transmission.json') -+ file_name = 'sys_tcp_transmission.json' -+ super().__init__(data_loader, anomaly_report, file_name) - - def execute_detect(self, machine_id: str): - for kpi in self.kpis: -- parameter = kpi.parameter -- start, end = dt.last(minutes=10) -- time_series_list = self.data_loader.get_metric( -- start, end, kpi.metric, label_name='machine_id', label_value=machine_id) -+ look_back = kpi.params.get('look_back', None) -+ box_pts = kpi.params.get('box_pts', None) -+ obs_size = kpi.params.get('obs_size', None) -+ outlier_ratio_th = kpi.params.get('outlier_ratio_th', None) - -- if not time_series_list: -+ start, end = dt.last(minutes=look_back) -+ ts_list = self.data_loader.\ -+ get_metric(start, end, kpi.metric, label_name='machine_id', label_value=machine_id) -+ -+ if not ts_list: - logger.warning(f'Key metric {kpi.metric} is null on the target machine {machine_id}!') - return - - point_count = self.data_loader.expected_point_length(start, end) - anomalies = [] -- threshold = parameter['threshold'] -- for time_series in time_series_list: -- if len(time_series.values) < point_count * 0.9 or len(time_series.values) > point_count * 1.5: -+ for _ts in ts_list: -+ if len(_ts.values) < point_count * 0.9 or len(_ts.values) > point_count * 1.5: - continue - -- if sum(time_series.values) == 0: -+ if sum(_ts.values) == 0: - continue - -- score = max(smooth_slope(time_series, windows_length=13)) -- -- if math.isnan(score) or math.isinf(score): -- continue -+ smoothed_val = conv_smooth(_ts.values, box_pts=box_pts) -+ outlier, mean, std = three_sigma(smoothed_val, obs_size=obs_size, method='min') -+ ratio = divide(len(outlier), obs_size) - -- if score > threshold: -+ if outlier and ratio >= outlier_ratio_th: -+ logger.info(f'Ratio: {ratio}, Outlier Ratio TH: {outlier_ratio_th}, ' -+ f'Mean: {mean}, Std: {std}') - anomalies.append( -- Anomaly(metric=time_series.metric, -- labels=time_series.labels, -- score=score, -+ Anomaly(metric=_ts.metric, -+ labels=_ts.labels, -+ score=ratio, -+ entity_name=kpi.entity_name, - description=kpi.description)) - - anomalies = sorted(anomalies, key=lambda x: x.score, reverse=True) -@@ -72,45 +79,45 @@ class SysTcpTransmissionDetector(Detector): - if anomalies: - logger.info('Sys io latency anomalies was detected.') - for anomaly in anomalies: -- self.report(anomaly, kpi.entity_name, machine_id) -+ self.report(anomaly, machine_id) - - def detect_features(self, machine_id: str, top_n=3): - priorities = {f.metric: f.priority for f in self.features} - start, end = dt.last(minutes=6) -- time_series_list = [] -+ ts_list = [] - for metric in priorities.keys(): -- time_series = self.data_loader.get_metric( -- start, end, metric, label_name='machine_id', label_value=machine_id) -- time_series_list.extend(time_series) -+ _ts = self.data_loader.\ -+ get_metric(start, end, metric, label_name='machine_id', label_value=machine_id) -+ ts_list.extend(_ts) - - point_count = self.data_loader.expected_point_length(start, end) -- sr_model = SpectralResidual(12, 24, 50) -+ model = SpectralResidual(12, 24, 50) - - result = [] -- for time_series in time_series_list: -- if len(time_series.values) < point_count * 0.9 or \ -- len(time_series.values) > point_count * 1.5: -+ for _ts in ts_list: -+ if len(_ts.values) < point_count * 0.9 or \ -+ len(_ts.values) > point_count * 1.5: - continue - -- values = time_series.values -+ values = _ts.values - - if all(x == values[0] for x in values): - continue - -- scores = sr_model.compute_score(values) -+ scores = model.compute_score(values) - score = max(scores[-13:]) - - if math.isnan(score) or math.isinf(score): - continue - -- result.append((time_series, score)) -+ result.append((_ts, score)) - - result = sorted(result, key=lambda x: x[1], reverse=True)[0: top_n] - result = sorted(result, key=lambda x: priorities[x[0].metric]) - - return result - -- def report(self, anomaly: Anomaly, entity_name: str, machine_id: str): -+ def report(self, anomaly: Anomaly, machine_id: str): - description = {f.metric: f.description for f in self.features} - cause_metrics = self.detect_features(machine_id, top_n=3) - cause_metrics = [ -@@ -124,6 +131,5 @@ class SysTcpTransmissionDetector(Detector): - cause[0].labels.get('server_port', ''))} - for cause in cause_metrics] - timestamp = dt.utc_now() -- template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, entity_name) -- template.labels = anomaly.labels -+ template = SysAnomalyTemplate(timestamp, machine_id, anomaly.metric, anomaly.entity_name) - self.anomaly_report.sent_anomaly(anomaly, cause_metrics, template) -diff --git a/anteater/provider/base.py b/anteater/provider/base.py -index eda2d1b..35d1d1f 100644 ---- a/anteater/provider/base.py -+++ b/anteater/provider/base.py -@@ -56,16 +56,16 @@ class TimeSeriesProvider: - def fetch(url, params: Dict, **args) -> List: - """Fetches data from prometheus server by http request""" - try: -- response = requests.get(url, params, timeout=30, **args).json() -+ response = requests.get(url, params, timeout=30, **args) - except requests.RequestException as e: - logger.error(f"RequestException: {e}!") - return [] -- -+ response = response.json() - result = [] - if response and response.get("status") == 'success': - result = response.get('data', {}).get('result', []) - else: -- logger.error(f"PrometheusAdapter get data failed, " -+ logger.error(f"Prometheus get data failed, " - f"error: {response.get('error')}, query_url: {url}, params: {params}.") - - return result -@@ -87,17 +87,19 @@ class TimeSeriesProvider: - data = self.fetch(self.url, params, headers=headers) - - for item in data: -- zipped_values = list(zip(*item.get("values"))) -+ zipped_values = list(zip(*item.get('values'))) - time_stamps = list(zipped_values[0]) - values = [float(v) for v in zipped_values[1]] - -- key = tuple(sorted(item.get("metric").items())) -+ key = tuple(sorted(item.get('metric').items())) - if key in tmp_index: - result[tmp_index.get(key)].extend(time_stamps, values) - else: -+ labels = item.get('metric') -+ labels.pop('__name__', None) - time_series = TimeSeries( - metric, -- item.get("metric"), -+ labels, - time_stamps, - values) - tmp_index[key] = len(result) -diff --git a/anteater/source/anomaly_report.py b/anteater/source/anomaly_report.py -index 2205f44..41542b7 100644 ---- a/anteater/source/anomaly_report.py -+++ b/anteater/source/anomaly_report.py -@@ -48,6 +48,7 @@ class AnomalyReport: - entity_name = template.entity_name - labels = anomaly.labels - -+ template.labels = labels - template.entity_id = self.get_entity_id(machine_id, entity_name, labels, keys) - template.keys = keys - template.description = anomaly.description -diff --git a/anteater/template/template.py b/anteater/template/template.py -index 52befaa..d86a8cb 100644 ---- a/anteater/template/template.py -+++ b/anteater/template/template.py -@@ -21,8 +21,8 @@ class Template: - self.machine_id = machine_id - self.metric_id = metric_id - self.entity_name = entity_name -- self.labels = {} - -+ self.labels = {} - self.entity_id = "" - self.description = "" - self.cause_metrics = {} -diff --git a/anteater/utils/common.py b/anteater/utils/common.py -index d6c80ab..a99a1ef 100644 ---- a/anteater/utils/common.py -+++ b/anteater/utils/common.py -@@ -170,3 +170,15 @@ def divide(x, y): - return x / y - else: - return 0 -+ -+ -+def same_intersection_key_value(first: dict, second: dict): -+ """Checks there are same key value pairs between two dictionaries -+ intersections by the key -+ """ -+ same_keys = set(first.keys()) & set(second.keys()) -+ for key in same_keys: -+ if first[key] != second[key]: -+ return False -+ -+ return True -diff --git a/config/gala-anteater.yaml b/config/gala-anteater.yaml -index b771c16..a01eb47 100644 ---- a/config/gala-anteater.yaml -+++ b/config/gala-anteater.yaml -@@ -3,7 +3,7 @@ Global: - is_sys: false - - Kafka: -- server: "9.16.143.92" -+ server: "localhost" - port: "9092" - model_topic: "gala_anteater_hybrid_model" - model_group_id: "gala_anteater_1" -@@ -12,8 +12,8 @@ Kafka: - meta_entity_name: "sli" - - Prometheus: -- server: "10.137.17.122" -- port: "29090" -+ server: "localhost" -+ port: "9090" - steps: 5 - - Aom: -diff --git a/config/module/app_sli_rtt.json b/config/module/app_sli_rtt.json -index 4372f07..0744416 100644 ---- a/config/module/app_sli_rtt.json -+++ b/config/module/app_sli_rtt.json -@@ -6,9 +6,11 @@ - "entity_name": "sli", - "enable": true, - "description": "sli rtt 异常", -- "parameter": { -- "min_nsec": 200000000, -- "threshold": 0.8 -+ "params": { -+ "look_back": 10, -+ "box_pts": 3, -+ "obs_size": 25, -+ "outlier_ratio_th": 0.3 - } - }, - { -@@ -17,8 +19,11 @@ - "entity_name": "sli", - "enable": true, - "description": "sli tps 异常", -- "parameter": { -- "threshold": -0.5 -+ "params": { -+ "look_back": 10, -+ "box_pts": 3, -+ "obs_size": 25, -+ "outlier_ratio_th": 0.3 - } - } - ], -diff --git a/config/module/proc_io_latency.json b/config/module/proc_io_latency.json -index 05bcd81..967ee10 100644 ---- a/config/module/proc_io_latency.json -+++ b/config/module/proc_io_latency.json -@@ -6,8 +6,11 @@ - "entity_name": "thread", - "enable": true, - "description": "Process IO_wait time (unit: us)", -- "parameter": { -- "threshold": 0.5 -+ "params": { -+ "look_back": 10, -+ "box_pts": 3, -+ "obs_size": 25, -+ "outlier_ratio_th": 0.3 - } - }, - { -@@ -16,8 +19,11 @@ - "entity_name": "proc", - "enable": true, - "description": "I/O operation delay at the BIO layer (unit: us)", -- "parameter": { -- "threshold": 0.5 -+ "params": { -+ "look_back": 10, -+ "box_pts": 3, -+ "obs_size": 25, -+ "outlier_ratio_th": 0.3 - } - } - ], -diff --git a/config/module/sys_io_latency.json b/config/module/sys_io_latency.json -index 66e5c5a..f790192 100644 ---- a/config/module/sys_io_latency.json -+++ b/config/module/sys_io_latency.json -@@ -5,9 +5,12 @@ - "kpi_type": "", - "entity_name": "block", - "enable": true, -- "description": "Block I/O latency performance", -- "parameter": { -- "threshold": 0.5 -+ "description": "Block I/O latency performance is deteriorating!", -+ "params": { -+ "look_back": 20, -+ "box_pts": 7, -+ "obs_size": 25, -+ "outlier_ratio_th": 0.3 - } - } - ], -diff --git a/config/module/sys_tcp_establish.json b/config/module/sys_tcp_establish.json -index d695c2c..b6589ed 100644 ---- a/config/module/sys_tcp_establish.json -+++ b/config/module/sys_tcp_establish.json -@@ -6,7 +6,10 @@ - "entity_name": "tcp_link", - "enable": true, - "description": "RTT of syn packet(us): existing {:.0%} syn packets rtt are more than {:.0f} us", -- "parameter": {} -+ "params": { -+ "look_back": 30, -+ "outlier_ratio_th": 0.3 -+ } - } - ], - "Features": [ -diff --git a/config/module/sys_tcp_transmission.json b/config/module/sys_tcp_transmission.json -index 5347cc9..522056f 100644 ---- a/config/module/sys_tcp_transmission.json -+++ b/config/module/sys_tcp_transmission.json -@@ -6,8 +6,37 @@ - "entity_name": "tcp_link", - "enable": true, - "description": "Smoothed Round Trip Time(us)", -- "parameter": { -- "threshold": 0.5 -+ "params": { -+ "look_back": 10, -+ "box_pts": 3, -+ "obs_size": 25, -+ "outlier_ratio_th": 0.3 -+ } -+ }, -+ { -+ "metric": "gala_gopher_net_tcp_in_segs", -+ "kpi_type": "in_segs", -+ "entity_name": "tcp_link", -+ "enable": true, -+ "description": "Total number of segments received", -+ "params": { -+ "look_back": 10, -+ "box_pts": 3, -+ "obs_size": 25, -+ "outlier_ratio_th": 0.3 -+ } -+ }, -+ { -+ "metric": "gala_gopher_net_tcp_out_segs", -+ "kpi_type": "out_segs", -+ "entity_name": "tcp_link", -+ "enable": true, -+ "description": "Total number of segments sent", -+ "params": { -+ "look_back": 10, -+ "box_pts": 3, -+ "obs_size": 25, -+ "outlier_ratio_th": 0.3 - } - } - ], --- -2.37.0.windows.1 -