2019 年 4 月

第 34 卷,第 4 期

[测试运行]

使用 PyTorch 执行神经异常情况检测

作者 James McCaffrey

James McCaffrey异常情况检测(亦称为“离群值检测”)是在数据集中查找罕见项的过程。例如,在服务器日志文件中发现恶意事件,以及查找欺诈性联机广告。

了解本文所述观点的一个好方法是查看图 1 中的演示程序。演示程序分析众所周知的已修改国家标准技术研究所 (MNIST) 数据集中包含 1,000 个项的子集。每个数据项都是手写数字 0 到 9 的 28x28(784 像素)灰度图像。完整的 MNIST 数据集包含 60,000 张定型图像和 10,000 张测试图像。

使用 Keras 执行 MNSIT 图像异常情况检测
图 1:使用 Keras 执行 MNSIT 图像异常情况检测

演示程序使用 PyTorch 代码库创建并定型 784-100-50-100-784 深度神经自动编码器。自动编码器是学习预测输入的神经网络。定型后,演示程序扫描 1,000 张图像,并查找最异常的一张图像(最异常意味着重构误差最高)。最异常的数字是 3,它看起来也可能是 8。

若要更好地理解本文,至少必须拥有中等或更高水平的 C 系列语言编程技能,并基本熟悉机器学习,但无需对自动编码器有任何了解。本文提供所有演示程序代码。也可以从随附的下载中获取代码和数据。为了尽可能地让主要思想清晰明确,已删除所有常见错误检查。

安装 PyTorch

PyTorch 是用于创建神经网络的相对较低级别代码库。它在功能方面与 TensorFlow 和 CNTK 大致相似。虽然 PyTorch 是用 C++ 编写的,但它有简化编程的 Python 语言 API。

PyTorch 安装主要分为两步。首先,安装 Python 和几个必需的辅助包(如 NumPy 和 SciPy)。然后,安装 PyTorch 作为 Python 附加包。尽管可以分别安装 Python 和运行 PyTorch 所必需的包,但最好安装 Python 发行版本,这个集合包含基础 Python 解释器和其他相互兼容的包。在演示程序中,我安装了 Anaconda3 5.2.0 发行版本,其中包含 Python 3.6.5。

安装 Anaconda 后,我转到了 pytorch.org 网站,并选择了 Windows OS、Pip 安装程序、Python 3.6 和非 CUDA GPU 版本对应的选项。这提供了指向相应 .whl(发音为“wheel”)文件的 URL,我将此文件下载到了本地计算机。如果是刚刚开始接触 Python 生态系统,可以将 Python .whl 文件看作类似于 Windows .msi 文件。在此示例中,我下载了 PyTorch 版本 1.0.0。我打开了命令行界面,转到了 .whl 文件的保存目录,并输入了以下命令:

pip install torch-1.0.0-cp36-cp36m-win_amd64.whl

演示程序

图 2 展示了完整的演示程序(为了节省空间,我进行了少量小幅编辑)。为了节省空间,我缩进了两个空格(而不是通常的四个空格)。请注意,Python 使用“\”字符作为续行符。我使用了记事本来编辑演示程序。我的大多数同事都更青睐更为复杂的编辑器,而我喜欢记事本“不留情面地”简单。

图 2:异常情况检测演示程序

# auto_anom_mnist.py
# PyTorch 1.0.0 Anaconda3 5.2.0 (Python 3.6.5)
# autoencoder anomaly detection on MNIST
import numpy as np
import torch as T
import matplotlib.pyplot as plt
# -----------------------------------------------------------
def display(raw_data_x, raw_data_y, idx):
  label = raw_data_y[idx]  # like '5'
  print("digit/label = ", str(label), "\n")
  pixels = np.array(raw_data_x[idx])  # target row of pixels
  pixels = pixels.reshape((28,28))
  plt.rcParams['toolbar'] = 'None'
  plt.imshow(pixels, cmap=plt.get_cmap('gray_r'))
  plt.show() 
# -----------------------------------------------------------
class Batcher:
  def __init__(self, num_items, batch_size, seed=0):
    self.indices = np.arange(num_items)
    self.num_items = num_items
    self.batch_size = batch_size
    self.rnd = np.random.RandomState(seed)
    self.rnd.shuffle(self.indices)
    self.ptr = 0
  def __iter__(self):
    return self
  def __next__(self):
    if self.ptr + self.batch_size > self.num_items:
      self.rnd.shuffle(self.indices)
      self.ptr = 0
      raise StopIteration  # ugh.
    else:
      result = self.indices[self.ptr:self.ptr+self.batch_size]
      self.ptr += self.batch_size
      return result
# -----------------------------------------------------------
class Net(T.nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.layer1 = T.nn.Linear(784, 100)  # hidden 1
    self.layer2 = T.nn.Linear(100, 50)
    self.layer3 = T.nn.Linear(50,100)
    self.layer4 = T.nn.Linear(100, 784)
    T.nn.init.xavier_uniform_(self.layer1.weight)  # glorot
    T.nn.init.zeros_(self.layer1.bias)
    T.nn.init.xavier_uniform_(self.layer2.weight) 
    T.nn.init.zeros_(self.layer2.bias)
    T.nn.init.xavier_uniform_(self.layer3.weight) 
    T.nn.init.zeros_(self.layer3.bias)
    T.nn.init.xavier_uniform_(self.layer4.weight) 
    T.nn.init.zeros_(self.layer4.bias)
  def forward(self, x):
    z = T.tanh(self.layer1(x))
    z = T.tanh(self.layer2(z))
    z = T.tanh(self.layer3(z))
    z = T.tanh(self.layer4(z))  # consider none or sigmoid
    return z
# -----------------------------------------------------------
def main():
  # 0. get started
  print("Begin autoencoder for MNIST anomaly detection")
  T.manual_seed(1)
  np.random.seed(1)
  # 1. load data
  print("Loading MNIST subset data into memory ")
  data_file = ".\\Data\\mnist_pytorch_1000.txt"
  data_x = np.loadtxt(data_file, delimiter=" ",
    usecols=range(2,786), dtype=np.float32)
  labels = np.loadtxt(data_file, delimiter=" ",
    usecols=[0], dtype=np.float32)
  norm_x = data_x / 255
  # 2. create autoencoder model
  net = Net()
  # 3. train autoencoder model
  net = net.train()  # explicitly set
  bat_size = 40
  loss_func = T.nn.MSELoss()
  optimizer = T.optim.Adam(net.parameters(), lr=0.01)
  batcher = Batcher(num_items=len(norm_x),
    batch_size=bat_size, seed=1)
  max_epochs = 100
  print("Starting training")
  for epoch in range(0, max_epochs):
    if epoch > 0 and epoch % (max_epochs/10) == 0:
      print("epoch = %6d" % epoch, end="")
      print("  prev batch loss = %7.4f" % loss_obj.item())
    for curr_bat in batcher:
      X = T.Tensor(norm_x[curr_bat])
      optimizer.zero_grad()
      oupt = net(X)
      loss_obj = loss_func(oupt, X)  # note X not Y
      loss_obj.backward()
      optimizer.step()
  print("Training complete")
  # 4. analyze - find item(s) with large(st) error
  net = net.eval()  # not needed - no dropout
  X = T.Tensor(norm_x)  # all input item as Tensors
  Y = net(X)            # all outputs as Tensors
  N = len(data_x)
  max_se = 0.0; max_ix = 0
  for i in range(N):
    curr_se = T.sum((X[i]-Y[i])*(X[i]-Y[i]))
    if curr_se.item() > max_se:
      max_se = curr_se.item()
      max_ix = i
  raw_data_x = data_x.astype(np.int)
  raw_data_y = labels.astype(np.int)
  print("Highest reconstruction error is index ", max_ix)
  display(raw_data_x, raw_data_y, max_ix)
  print("End autoencoder anomaly detection demo ")
# -----------------------------------------------------------
if __name__ == "__main__":
  main()

演示程序先导入 NumPy、PyTorch 和 Matplotlib 包。Matplotlib 包用于直观显示模型找到的最异常数字。导入整个 PyTorch 包的另一种方法是,只导入必要的模块(例如选择导入 torch.optim)。

将数据加载到内存

处理原始 MNIST 数据相当困难,因为它是以专有的二进制格式保存。我编写了从 60,000 个定型项中提取前 1,000 项的实用程序。我将数据保存为“数据”子目录中的 mnist_pytorch_1000.txt。

生成的数据如下所示:

7 = 0 255 67 . . 123
2 = 113 28 0 . . 206
...
9 = 0 21 110 . . 254

每一行代表一个数字。每一行的第一个值是数字。第二个值是任意等于号字符,只是为了提高可读性。接下来的 28x28 = 784 值是介于 0 和 255 之间的灰度像素值。所有值都以一个空格字符分隔。图 3**** 展示了数据文件中索引为 [30] 的数据项,它为典型的数字“3”。

典型的 MNIST 数字
图 3:典型的 MNIST 数字

数据集通过以下语句加载到内存中:

data_file = ".\\Data\\mnist_pytorch_1000.txt"
data_x = np.loadtxt(data_file, delimiter=" ",
  usecols=range(2,786), dtype=np.float32)
labels = np.loadtxt(data_file, delimiter=" ",
  usecols=[0], dtype=np.float32)
norm_x = data_x / 255

请注意,数字/标签位于第 0 列中,784 像素值位于第 2 到 785 列中。将 1,000 张图像全部加载到内存后,用每个像素值除以 255,这样按比例缩小的像素值全都介于 0.0 和 1.0 之间,从而创建数据的规范化版本。

定义自动编码器模型

演示程序定义 784-100-50-100-784 自动编码器。虽然输入层和输出层中的节点数 (784) 取决于数据,但隐藏层数和每层中的节点数都是超参数,必须通过反复试验来确定。

演示程序使用程序定义的类 Net,定义自动编码器的层体系结构和输入输出机制。另一种方法是,使用 Sequence 函数直接创建自动编码器,例如:

net = T.nn.Sequential(
  T.nn.Linear(784,100), T.nn.Tanh(),
  T.nn.Linear(100,50), T.nn.Tanh(),
  T.nn.Linear(50,100), T.nn.Tanh(),
  T.nn.Linear(100,784), T.nn.Tanh())

权重初始化算法 (Glorot uniform)、隐藏层激活函数 (tanh) 和输出层激活函数 (tanh) 都是超参数。对于这个问题,由于所有输入值和输出值都介于 0.0 和 1.0 之间,因此逻辑函数 sigmoid 也非常适用于输出激活。

定型和评估自动编码器模型

演示程序使用以下语句准备定型:

net = net.train()  # explicitly set
bat_size = 40
loss_func = T.nn.MSELoss()
optimizer = T.optim.Adam(net.parameters(), lr=0.01)
batcher = Batcher(num_items=len(norm_x),
  batch_size=bat_size, seed=1)
max_epochs = 100

因为演示程序自动编码器不使用 Dropout 正则化或批正则化,所以没有必要将网络设置显式为定型模式,但在我看来,最好这样做。批大小 (40)、定型优化算法 (Adam)、初始学习速率 (0.01) 和最大时期数 (100) 都是超参数。如果是刚刚开始接触神经机器学习,你可能会认为“神经网络肯定有很多超参数”,这样的想法是对的。

程序定义的 Batcher 对象一次提供 40 个随机数据项索引,直到处理完所有 1,000 项(一个时期)。另一种方法是,使用 torch.utils.data 模块中的内置 Dataset 和 DataLoader 对象。

定型流程结构为:

for epoch in range(0, max_epochs):
  # print loss every 10 epochs
  for curr_bat in batcher:
    X = T.Tensor(norm_x[curr_bat])
    optimizer.zero_grad()
    oupt = net(X)
    loss_obj = loss_func(oupt, X)
    loss_obj.backward()
    optimizer.step()

每批项都是使用 Tensor 构造函数创建,此函数使用 torch.float32 作为默认数据类型。请注意,loss_func 函数将计算出的输出与输入进行比较,这样可以通过定型网络来预测输入值。

定型后,通常需要保存模型,但这有点超出本文的介绍范围。PyTorch 文档提供了一些实用示例,展示了如何以几种不同的方式保存定型后的模型。

使用自动编码器时,大多数情况下(包括此示例),都没有固有的模型准确度定义。必须确定计算出的输出值必须与相关输入值有多接近,才能算是正确预测,然后编写程序定义的函数来计算准确度指标。

使用自动编码器模型查找异常数据

定型自动编码器模型后,就要查找很难正确预测的数据项(相当于很难重构的项)。演示程序代码扫描所有 1,000 个数据项,并计算规范化输入值与计算出的输出值之间的均方误差,如下所示:

net = net.eval()  # not needed - no dropout
X = T.Tensor(norm_x)  # all input item as Tensors
Y = net(X)            # all outputs as Tensors
N = len(data_x)
max_se = 0.0; max_ix = 0
for i in range(N):
  curr_se = T.sum((X[i]-Y[i])*(X[i]-Y[i]))
  if curr_se.item() > max_se:
    max_se = curr_se.item()
    max_ix = i

计算最大均方误差 (max_se),并保存相关图像的索引 (max_ix)。若要查找重构误差最高的一项,另一种方法是保存所有均方误差,对它们进行排序,并返回前 n 项(其中 n 值取决于要调查的特定问题)。

找到最异常的一个数据项后,使用程序定义的显示函数显示它:

raw_data_x = data_x.astype(np.int)
raw_data_y = labels.astype(np.int)
print("Highest reconstruction error is index ", max_ix)
display(raw_data_x, raw_data_y, max_ix)

从原则上讲,像素值和标签值大多都会从类型 float32 转换为 int,因为程序定义的显示函数中的 Matplotlib imshow 函数可以接受两种数据类型中的任何一种。

总结

如本文所述,使用深度神经自动编码器执行异常情况检测并不是已成熟研究的技术。与大多数标准聚类分析技术相比,使用神经自动编码器的一大优势是,神经技术可以通过编码非数值数据来处理此类数据。大多数聚类分析技术都依赖数值度量值(如欧几里得距离);也就是说,源数据必须严格为数值。

另一种相关但很少探讨的异常情况检测技术是,为要调查的数据集创建自动编码器。然后,可使用标准算法(如 k-平均算法)对数据进行聚类分析,而不是通过重构误差来查找异常数据,因为最靠中心的隐藏层节点严格地采用每个数据项的数值表示形式。完成聚类分析后,可以查找数据项很少的聚类,也可以在聚类中查找距离聚类中心最远的数据项。这种方法的特征类似于神经词嵌入,后者是将词转换为数值向量,然后可用于计算词之间的距离度量值。


Dr.James McCaffrey 供职于华盛顿地区雷蒙德市沃什湾的 Microsoft Research。他参与开发过多个重要 Microsoft 产品(包括 Azure 和必应)。Dr.可通过 jamccaff@microsoft.com 与 McCaffrey 取得联系。

衷心感谢以下 Microsoft 技术专家对本文的审阅:Chris Lee 和 Ricky Loynd