时间序列数据随处可见:网站每分钟的访问量、传感器读数、股票价格、人流计数、服务器 CPU 使用率,都是典型场景。
多数时候这类数据遵循某种规律。异常检测的目标就是找到规律被打破的那些时刻。
![]()
什么是时间序列数据中的异常?
异常指的是与正常行为产生明显偏离的数据点或数据序列。举几个例子:凌晨 3 点网站流量突然飙升;传感器因设备故障出现读数骤降;已关门的商店内人流量异常激增。
为什么时间序列异常检测很困难
时间序列数据天然包含趋势(缓慢的上升或下降)、季节性(日级或周级的周期模式)以及噪声(随机波动)。这三个成分叠加在一起,让"正常"本身就在不断变化。
一个值的高低本身不构成异常,它是否异常取决于出现的时间点。中午有 1000 个访客是正常的,凌晨 3 点有 1000 个访客就不正常了。
![]()
学习"正常"的样子
在检测异常之前,系统需要先建立对正常行为的认知——预期的数值范围、长期趋势走向以及重复出现的季节性模式。
不同的数据特征对应不同的检测策略。
![]()
方法 1:统计阈值法(Z-Score)
最简单的做法,假设数据服从正态分布。
import numpy as np
def z_score_anomaly(data, threshold=3):
mean = np.mean(data)
std = np.std(data)
z_scores = (data - mean) / std
anomalies = np.abs(z_scores) > threshold
return anomalies
适用场景:没有趋势的平稳数据。
方法 2:移动平均 + 残差
适用于带有平滑趋势的数据。
import pandas as pd
def moving_average_anomaly(series, window=10, threshold=2):
rolling_mean = series.rolling(window).mean()
residual = series - rolling_mean
std = residual.std()
return abs(residual) > threshold * std
它的优势在于,每个数据点比较的是自身的局部上下文而非全局均值。
方法 3:季节性分解
周期性模式明显的数据最适合用这个方法。
from statsmodels.tsa.seasonal import seasonal_decompose
def seasonal_anomaly(series, period=24):
result = seasonal_decompose(series, model='additive', period=period)
residual = result.resid
threshold = 3 * residual.std()
return abs(residual) > threshold
季节性分解把原始序列拆成趋势、季节性和残差三个分量,异常通常藏在残差里。
方法 4:机器学习(Isolation Forest)
不依赖任何分布假设,直接隔离罕见模式。
from sklearn.ensemble import IsolationForest
def isolation_forest_anomaly(data, contamination=0.02):
model = IsolationForest(contamination=contamination)
preds = model.fit_predict(data.reshape(-1, 1))
return preds == -1
适用场景:模式未知、数据不规则,或者多变量时间序列。
方法 5:深度学习(自编码器)
自编码器学习重建正常序列,重建误差高的部分即为异常。
import numpy as np
def reconstruction_error(original, reconstructed):
return np.mean((original - reconstructed) ** 2)
适合处理模式复杂、维度较多、存在长期依赖关系的时间序列。
示例:人流量分析
import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import seasonal_decompose
# 生成商店人流量数据(1 周,每小时)
hours = pd.date_range('2024-01-01', periods=24*7, freq='H')
hour_of_day = hours.hour
# 正常:上午 9 点到晚上 9 点繁忙,夜间安静
base = 100 + 80 * ((hour_of_day >= 9) & (hour_of_day <= 21))
traffic = pd.Series(base + np.random.normal(0, 10, len(hours)), index=hours)
# 注入异常
traffic.iloc[15] = 200 # 凌晨 3 点飙升(摄像头问题)
traffic.iloc[75] = 5 # 营业时间下降(故障)
# 检测
result = seasonal_decompose(traffic, model='additive', period=24)
residual = result.resid
anomalies = abs(residual) > 3 * residual.std()
print(f"Detected {anomalies.sum()} anomalies")
减少误报
误报是异常检测在生产环境中最常见的痛点。三种思路可以缓解。
调整灵敏度:控制标记比例:
model = IsolationForest(contamination=0.02) # 仅标记 2%
要求持续性:只有连续多个点都表现异常时才触发告警:
# 仅当异常持续 3 个及以上连续点时才标记
consecutive_count >= 3
集成投票:多种方法同时判断,取多数一致的结果:
# 投票:如果 2 个及以上方法一致则标记
votes = method1 + method2 + method3
anomalies = votes >= 2
总结
异常检测的核心不在于找出"奇怪的数字",而在于理解每个时间点上什么才算正常。先对数据做可视化探索,从移动平均或季节性分解入手;如果数据模式复杂,引入 Isolation Forest;生产系统中建议组合多种方法以降低误判。
异常检测要做的,是识别那些偏离了时间、趋势和行为规律的数据点。
https://avoid.overfit.cn/post/a6de4ac94dd64768a768593e39b6c7cb
by Bhargavi Guddati
特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.