Abril de 2019

Volumen 34, número 4

[Serie de pruebas]

Detección de anomalías neuronales mediante PyTorch

Por James McCaffrey

James McCaffreyLa detección de anomalías, también denominada detección de valores atípicos, es el proceso de búsqueda de elementos poco frecuentes en un conjunto de datos. Algunos ejemplos son la identificación de eventos maliciosos en un archivo de registro de servidor y la búsqueda de publicidad en línea fraudulenta.

Una buena manera de ver hacia dónde se dirige este artículo es echar un vistazo al programa de demostración de la figura 1. La demostración analiza un subconjunto de 1000 elementos del conocido conjunto de datos MNIST (Modified National Institute of Standards and Technology). Cada elemento de datos es una imagen en escala de grises de 28 x 28 (784 píxeles) de un dígito de cero a nueve escrito a mano. El conjunto de datos MNIST completo tiene 60 000 imágenes de entrenamiento y 10 000 imágenes de prueba.

Detección de anomalías de imagen de MNSIT mediante Keras
Figura 1 Detección de anomalías de imagen de MNSIT mediante Keras

El programa de demostración crea y entrena un autoencoder neuronal profundo 784-100-50-100-784 mediante la biblioteca de código de PyTorch. Un autoencoder es una red neuronal que aprende a predecir su entrada. Después del entrenamiento, la demostración examina 1000 imágenes y busca la imagen más anómala, donde más anómala significa mayor error de reconstrucción. El dígito más anómalo es un tres que podría parecer un ocho.

En este artículo, se supone que tiene conocimientos intermedios o altos de programación con un lenguaje de la familia C y que está familiarizado en cierta medida con el aprendizaje automático, pero no que no sepa nada acerca de autoencoders. En este artículo se presenta todo el código de demostración. El código y los datos también están disponibles en la descarga complementaria. Se han eliminado todas las comprobaciones de errores normales para mantener las ideas principales lo más claro posible.

Instalar PyTorch

PyTorch es una biblioteca de código de relativamente bajo nivel para crear redes neuronales. En términos de funcionalidad, es similar a TensorFlow y CNTK. PyTorch se escribe en C++, pero tiene una API de lenguaje Python para facilitar la programación.

La instalación de PyTorch incluye dos pasos principales. Primero, debe instalar Python y varios paquetes auxiliares necesarios, como NumPy y SciPy. Luego, debe instalar PyTorch como paquete de complementos de Python. Aunque es posible instalar Python y los paquetes necesarios para ejecutar PyTorch por separado, es mucho mejor instalar una distribución de Python, que es una colección que contiene el intérprete de Python básico y paquetes adicionales compatibles entre sí. Para mi demostración, he instalado la distribución Anaconda3 5.2.0, que contiene Python 3.6.5.

Después de instalar Anaconda, me dirigí al sitio web pytorch.org y seleccioné las opciones para el sistema operativo Windows, el instalador de Pip, Python 3.6 y ninguna versión de GPU de CUDA. Esto me devolvió una dirección URL que señalaba al archivo .whl (pronunciado "wheel") correspondiente, que descargué en mi máquina local. Si es nuevo en el ecosistema de Python, puede pensar en un archivo .whl de Python como algo similar a un archivo .msi de Windows. En mi caso, descargué la versión 1.0.0 de PyTorch. Abrí un shell de comandos, navegué al directorio que contiene el archivo .whl y escribí el comando:

pip install torch-1.0.0-cp36-cp36m-win_amd64.whl

Programa de demostración

El programa de demostración completo, con ediciones menores para ahorrar espacio, se presenta en la figura 2. Para ahorrar espacio, aplico una sangría de dos espacios, en lugar de los cuatro espacios habituales. Tenga en cuenta que Python utiliza el carácter “\” para la continuación de línea. Usé el Bloc de notas para editar mi programa. La mayoría de mis compañeros prefiere un editor más sofisticado, pero a mí me gusta la inmensa simplicidad del Bloc de notas.

Figura 2 Programa de demostración de detección de anomalías

# auto_anom_mnist.py
# PyTorch 1.0.0 Anaconda3 5.2.0 (Python 3.6.5)
# autoencoder anomaly detection on MNIST
import numpy as np
import torch as T
import matplotlib.pyplot as plt
# -----------------------------------------------------------
def display(raw_data_x, raw_data_y, idx):
  label = raw_data_y[idx]  # like '5'
  print("digit/label = ", str(label), "\n")
  pixels = np.array(raw_data_x[idx])  # target row of pixels
  pixels = pixels.reshape((28,28))
  plt.rcParams['toolbar'] = 'None'
  plt.imshow(pixels, cmap=plt.get_cmap('gray_r'))
  plt.show() 
# -----------------------------------------------------------
class Batcher:
  def __init__(self, num_items, batch_size, seed=0):
    self.indices = np.arange(num_items)
    self.num_items = num_items
    self.batch_size = batch_size
    self.rnd = np.random.RandomState(seed)
    self.rnd.shuffle(self.indices)
    self.ptr = 0
  def __iter__(self):
    return self
  def __next__(self):
    if self.ptr + self.batch_size > self.num_items:
      self.rnd.shuffle(self.indices)
      self.ptr = 0
      raise StopIteration  # ugh.
    else:
      result = self.indices[self.ptr:self.ptr+self.batch_size]
      self.ptr += self.batch_size
      return result
# -----------------------------------------------------------
class Net(T.nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.layer1 = T.nn.Linear(784, 100)  # hidden 1
    self.layer2 = T.nn.Linear(100, 50)
    self.layer3 = T.nn.Linear(50,100)
    self.layer4 = T.nn.Linear(100, 784)
    T.nn.init.xavier_uniform_(self.layer1.weight)  # glorot
    T.nn.init.zeros_(self.layer1.bias)
    T.nn.init.xavier_uniform_(self.layer2.weight) 
    T.nn.init.zeros_(self.layer2.bias)
    T.nn.init.xavier_uniform_(self.layer3.weight) 
    T.nn.init.zeros_(self.layer3.bias)
    T.nn.init.xavier_uniform_(self.layer4.weight) 
    T.nn.init.zeros_(self.layer4.bias)
  def forward(self, x):
    z = T.tanh(self.layer1(x))
    z = T.tanh(self.layer2(z))
    z = T.tanh(self.layer3(z))
    z = T.tanh(self.layer4(z))  # consider none or sigmoid
    return z
# -----------------------------------------------------------
def main():
  # 0. get started
  print("Begin autoencoder for MNIST anomaly detection")
  T.manual_seed(1)
  np.random.seed(1)
  # 1. load data
  print("Loading MNIST subset data into memory ")
  data_file = ".\\Data\\mnist_pytorch_1000.txt"
  data_x = np.loadtxt(data_file, delimiter=" ",
    usecols=range(2,786), dtype=np.float32)
  labels = np.loadtxt(data_file, delimiter=" ",
    usecols=[0], dtype=np.float32)
  norm_x = data_x / 255
  # 2. create autoencoder model
  net = Net()
  # 3. train autoencoder model
  net = net.train()  # explicitly set
  bat_size = 40
  loss_func = T.nn.MSELoss()
  optimizer = T.optim.Adam(net.parameters(), lr=0.01)
  batcher = Batcher(num_items=len(norm_x),
    batch_size=bat_size, seed=1)
  max_epochs = 100
  print("Starting training")
  for epoch in range(0, max_epochs):
    if epoch > 0 and epoch % (max_epochs/10) == 0:
      print("epoch = %6d" % epoch, end="")
      print("  prev batch loss = %7.4f" % loss_obj.item())
    for curr_bat in batcher:
      X = T.Tensor(norm_x[curr_bat])
      optimizer.zero_grad()
      oupt = net(X)
      loss_obj = loss_func(oupt, X)  # note X not Y
      loss_obj.backward()
      optimizer.step()
  print("Training complete")
  # 4. analyze - find item(s) with large(st) error
  net = net.eval()  # not needed - no dropout
  X = T.Tensor(norm_x)  # all input item as Tensors
  Y = net(X)            # all outputs as Tensors
  N = len(data_x)
  max_se = 0.0; max_ix = 0
  for i in range(N):
    curr_se = T.sum((X[i]-Y[i])*(X[i]-Y[i]))
    if curr_se.item() > max_se:
      max_se = curr_se.item()
      max_ix = i
  raw_data_x = data_x.astype(np.int)
  raw_data_y = labels.astype(np.int)
  print("Highest reconstruction error is index ", max_ix)
  display(raw_data_x, raw_data_y, max_ix)
  print("End autoencoder anomaly detection demo ")
# -----------------------------------------------------------
if __name__ == "__main__":
  main()

Para empezar, el programa de demostración importa los paquetes de NumPy, PyTorch y Matplotlib. El paquete de Matplotlib se usa para mostrar visualmente el dígito más anómalo que encuentra el modelo. Una alternativa a la importación de todo el paquete de PyTorch es importar solo los módulos necesarios; por ejemplo, importar torch.optim como opción.

Carga de los datos en la memoria

Trabajar con datos de MNIST sin procesar es bastante difícil, ya que se guardan en un formato binario y propietario. Escribí una utilidad para extraer los 1000 primeros elementos de los 60 000 elementos de entrenamiento. Guardé los datos como mnist_pytorch_1000.txt en un subdirectorio de datos.

Los datos resultantes tienen el aspecto siguiente:

7 = 0 255 67 . . 123
2 = 113 28 0 . . 206
...
9 = 0 21 110 . . 254

Cada línea representa un dígito. El primer valor de cada línea es el dígito. El segundo valor es un carácter de signo igual arbitrario con fines de legibilidad. Los siguientes 28 x 28 = 784 valores son valores de píxeles de escala de grises entre cero y 255. Todos los valores se separan mediante un único carácter de espacio en blanco. En la figura 3 se muestra el elemento de datos en el índice [30] del archivo de datos, que, normalmente, es un dígito "3" típico.

Dígito de MNIST típico
Figura 3 Dígito de MNIST típico

El conjunto de datos se carga en memoria con estas instrucciones:

data_file = ".\\Data\\mnist_pytorch_1000.txt"
data_x = np.loadtxt(data_file, delimiter=" ",
  usecols=range(2,786), dtype=np.float32)
labels = np.loadtxt(data_file, delimiter=" ",
  usecols=[0], dtype=np.float32)
norm_x = data_x / 255

Observe que el dígito o etiqueta está en la columna cero y que los 784 valores de píxeles están en las columnas de 2 a 785. Después de todo, se cargan 1000 imágenes en memoria y se crea una versión normalizada de los datos dividiendo cada valor de píxel por 255 para que los valores de píxel escalados se encuentren entre 0,0 y 1,0.

Definición del modelo de autoencoder

El programa de demostración define un autoencoder 784-100-50-100-784. El número de nodos de las capas de entrada y salida (784) viene determinado por los datos, pero el número de capas ocultas y el número de nodos de cada capa son hiperparámetros que se deben determinar mediante prueba y error.

El programa de demostración usa una clase definida por el programa, Net, para definir la arquitectura de capas y el mecanismo de entrada y salida del autoencoder. Una alternativa es crear el autoencoder directamente mediante la función de secuencia; por ejemplo:

net = T.nn.Sequential(
  T.nn.Linear(784,100), T.nn.Tanh(),
  T.nn.Linear(100,50), T.nn.Tanh(),
  T.nn.Linear(50,100), T.nn.Tanh(),
  T.nn.Linear(100,784), T.nn.Tanh())

El algoritmo de inicialización de ponderación (Glorot uniforme), la función de activación de la capa oculta (tanh) y la función de activación de la capa de salida (tanh) son hiperparámetros. Dado que todos los valores de entrada y salida oscilan entre 0,0 y 1,0 para este problema, la logística sigmoide es una buena alternativa que explorar para la activación de salida.

Entrenamiento y evaluación del modelo de autoencoder

El programa de demostración prepara el aprendizaje con estas instrucciones:

net = net.train()  # explicitly set
bat_size = 40
loss_func = T.nn.MSELoss()
optimizer = T.optim.Adam(net.parameters(), lr=0.01)
batcher = Batcher(num_items=len(norm_x),
  batch_size=bat_size, seed=1)
max_epochs = 100

Dado que el autoencoder de demostración no usa la normalización de abandono o por lotes, no es necesario definir explícitamente la red en el modo de entrenamiento, pero, en mi opinión, hacerlo es una práctica recomendada. El tamaño del lote (40), el algoritmo de optimización del aprendizaje (Adam), la velocidad de aprendizaje inicial (0,01) y el número máximo de épocas (100) son hiperparámetros. Si no está familiarizado con el aprendizaje automático neuronal, quizás esté pensando que las redes neuronales tienen una gran cantidad de hiperparámetros. Está en lo cierto.

El objeto Batcher definido por el programa sirve los índices de 40 elementos de datos aleatorios a la vez hasta que se han procesado los 1000 elementos (una época). Un enfoque alternativo es usar los objetos Dataset y DataLoader integrados en el módulo torch.utils.data.

La estructura del proceso de entrenamiento es:

for epoch in range(0, max_epochs):
  # print loss every 10 epochs
  for curr_bat in batcher:
    X = T.Tensor(norm_x[curr_bat])
    optimizer.zero_grad()
    oupt = net(X)
    loss_obj = loss_func(oupt, X)
    loss_obj.backward()
    optimizer.step()

Cada lote de elementos se crea mediante el constructor de tensores, que usa torch.float32 como tipo de datos predeterminado. Tenga en cuenta que la función loss_func compara los resultados calculados para las entradas, lo que tiene el efecto de entrenar la red para predecir sus valores de entrada.

Después del entrenamiento, normalmente, querrá guardar el modelo, pero eso queda fuera del ámbito de este artículo. La documentación de PyTorch tiene buenos ejemplos que muestran cómo guardar un modelo entrenado de distintas formas.

Cuando se trabaja con autoencoders, en la mayoría de situaciones (incluido este ejemplo), no hay ninguna definición inherente de precisión del modelo. Debe determinar cuánto deben aproximarse los valores de salida calculados a los valores de entrada asociados para que se consideren una predicción correcta y, a continuación, escribir una función definida por el programa para calcular su métrica de precisión.

Uso del modelo de autoencoder para buscar datos anómalos

Una vez entrenado el modelo de autoencoder, la idea es buscar elementos de datos difíciles de predecir correctamente o, de forma equivalente, elementos difíciles de reconstruir. El código de demostración examina los 1000 elementos de datos y calcula la diferencia al cuadrado entre los valores de entrada normalizados y los valores de salida calculados, como se indica a continuación:

net = net.eval()  # not needed - no dropout
X = T.Tensor(norm_x)  # all input item as Tensors
Y = net(X)            # all outputs as Tensors
N = len(data_x)
max_se = 0.0; max_ix = 0
for i in range(N):
  curr_se = T.sum((X[i]-Y[i])*(X[i]-Y[i]))
  if curr_se.item() > max_se:
    max_se = curr_se.item()
    max_ix = i

Se calcula el error cuadrático máximo (max_se) y se guarda el índice de la imagen asociada (max_ix). Una alternativa a la búsqueda del único elemento con el mayor error de reconstrucción es guardar todos los errores cuadráticos, ordenarlos y devolver los elementos top-n especificados, donde el valor de n dependerá del problema concreto que esté investigando.

Una vez encontrado el elemento de datos más anómalo, se muestra mediante la función de presentación definida por el programa:

raw_data_x = data_x.astype(np.int)
raw_data_y = labels.astype(np.int)
print("Highest reconstruction error is index ", max_ix)
display(raw_data_x, raw_data_y, max_ix)

En general, los valores de píxel y etiqueta se convierten del tipo float32 a int como una cuestión de principio, ya que la función imshow de Matplotlib dentro de la función de presentación definida por el programa puede aceptar cualquier tipo de datos.

Resumen

La detección de anomalías mediante un autoencoder neuronal profundo, como se presenta en este artículo, no es una técnica bien investigada. Una gran ventaja de usar un autoencoder neuronal, en comparación con la mayoría de técnicas de agrupación en clústeres estándar es que las técnicas neuronales pueden controlar datos no numéricos mediante la codificación de los datos. La mayor parte de técnicas de agrupación en clústeres depende de una medida numérica, como la distancia euclidiana, lo que significa que los datos de origen deben ser estrictamente numéricos.

Una técnica relacionada, pero poco explorada para la detección de anomalías consiste en crear un autoencoder para el conjunto de datos que se está investigando. A continuación, en lugar de usar el error de reconstrucción para buscar datos anómalos, puede agruparlos en clúster con un algoritmo estándar como k-means, ya que los nodos de capas ocultos más internos contienen una representación estrictamente numérica de cada elemento de datos. Después de la agrupación en clústeres, puede buscar clústeres con muy pocos elementos de datos, o buscar elementos de datos en los clústeres más distantes del centroide del clúster. Este enfoque tiene características similares a la inclusión de palabras neuronales, donde las palabras se convierten en vectores numéricos que, a continuación, se pueden usar para calcular una medida de distancia entre palabras.


El Dr. James McCaffrey trabaja para Microsoft Research en Redmond, Washington. Ha colaborado en varios productos clave de Microsoft, como Azure y Bing. Puede ponerse en contacto con el Dr. McCaffrey en jamccaff@microsoft.com.

Gracias a los siguientes expertos técnicos de Microsoft por revisar este artículo: Chris Lee, Ricky Loynd