본문 바로가기

논문리뷰/Anomaly Detection

Unsupervised Anomaly Detection withGenerative Adversarial Networks to GuideMarker Discovery (AnoGAN) [IPMI 2017]

이 글은 Unsupervised Anomaly Detection withGenerative Adversarial Networks to GuideMarker Discovery (AnoGAN)논문을 참고하여 정리하였음을 먼저 밝힙니다. 논문에서 사용한 방법론을 간단하게 설명하고, pytorch 라이브러리를 이용하여 코드를 구현한 후 추가적으로 설명드리겠습니다. 혹시 제가 잘못 알고 있는 점이나 보안할 점이 있다면 댓글 부탁드립니다.


0. Summary

 

본 논문은 의학 도메인의 가장 큰 문제인 병리 데이터 부족 문제로 인한 지도 학습의 한계점에 해결하기 위해 DCGAN과 Anomaly detection을 같이 활용할 수 있는 방법론을 사용하였습니다. 즉 AnoGAN이라는 방법론을 제안한 페이퍼입니다.


1. Unsupervised Manifold Learning of Normal Anatomical Variability

 

우선 학습에 사용할 이미지는 정상 이미지에 해당하는 m개의 이미지만 사용합니다. 즉 학습에 사용할 각 이미지의 사이즈를 a x b라 정의하면 \(I_m \in R^{a \times b} \)의 이미지를 사용합니다. 또한 각 이미지당 무작위의 c x c size의 k개의 patches를 뽑아서 실제 인풋으로는 \(x_{k,m} \in \chi\)로 학습합니다. 

 

 

위의 그림에서 normal에 해당하는 파란색 부분을 \(\chi\) manifold라 말하며 정상 이미지가 가질 수 있는 가변적인 공간을 말합니다.

 

위의 수식은 GAN에서 사용하는 loss함수에 대한 정의로 D는 가짜 이미지와 진짜 이미지를 구분하는 discriminator를 말하며, G는 가짜 이미지를 만드는 generator입니다. x는 실제 이미지에서 얻은 patches이며 discriminator는 해당 patches를 실제 이미지인 1이라고 판단해야 합니다. 따라서 log함수를 씌워 잘 구별하면 loss함수가 감소하는 방향으로 학습하도록 설계되었습니다. z는 generator가 만든 가짜 이미지의 patches이며 discriminator가 진짜 이미지라고 생각해야 합니다. 즉 D(G(z))값이 1로 가길 원하며, log를 씌워 마찬가지로 loss함수가 감소하는 방향으로 학습하도록 설계되었습니다. 


2. Mapping new Images to the Latent Space

 

GAN만으로 annomaly detection을 해결하기에는 다음과 같은 문제점이 있습니다. Generator가 만든 이미지를 실제 이미지와 유사한 x로 매핑하는 함수를 G(z) = z -> x라고 가정했을 때, 해당 잠재 공간(latent space)에서 가까운 두 지점에 해당하는 이미지를 G함수를 통해 generate 한다면 두 이미지는 유사해야 합니다. 즉, smooth transitions 문제를 해결하기 위하여 residual loss를 제안했으며, 또한 생성한 이미지가 \(\chi\) manifold를 벗어나지 않도록 하는 discrimination loss를 같이 사용하도록 제안합니다.

 

1) Residual loss

 

위의 수식은 latent distribution을 잘 학습하도록 설계되었으며, 무작위로 \(z_{\gamma}\)를 뽑았을 때, 해당 값으로 생성한 이미지와 실제 이미지가 pixel wise 하게 얼마나 다른지로 학습하여 backpropagation을 진행하여 parameters를 업데이트합니다. 즉, 잠재 공간에서 생각해보면 유사한 이미지는 서로 가까이 존재하도록 학습한다고 생각할 수 있습니다.

 

2) Discrimination loss

 

위의 수식은 "Semantic image inpainting with perceptual and contextual loss" 페이퍼에서 사용한 loss function으로 \(\alpha\)는 target에 해당하며 G가 만든 이미지를 D가 1로 판단하도록 학습하도록 설계되었습니다. 그러나 classification에 도움이 된다는 문제점이 존재하여 저자는 feature mapping이라는 방법론을 사용하여 좀 더 위의 문제에 적합한 Loss function을 제안했습니다.

 

 

즉, 기존의 scalar 아웃풋이 아닌 f라는 feature mapping 함수를 사용하여 코드를 적용했으며, f는 discriminator의 중간 레이어의 아웃풋을 flatten한 함수를 말합니다. 코드에서는 fully connected layer이전까지의 layer를 f로 사용하였습니다.


3. Detection of Anomalies

 

 

따라서 다음과 같은 최종 loss function을 정의하여 사용하였습니다. R() 함수는 Residual loss이고, D() 함수는 위에서 정의한 feature mapping을 사용한 discriminator loss입니다.


4. Experiments

 

실험에서는 hyperparameter에 해당하는 \(\lambda\)값을 0.1, patch size를 64, 학습에 사용할 전체 patches 1,000,000개의 이미지를 추출하여 사용하였습니다.


5. Results

 

 

위의 (b)그림은 각 R, D loss function에 해당하는 ROC curve 이며, (c)그림은 residual score에 대한 분포, (d)그림은 discrimination score에 대한 분포를 말합니다.

 

 

\(P_D\)는 latent space를 skip한 결과입니다. 따라서 AnoGAN의 아이디어가 좋은 결과로 나타났음을 확인할 수 있습니다.


6. Code review

 

학습은 50 epochs만 진행하였습니다.

 

1) import module

 

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.utils import make_grid, save_image
import torchvision.utils as vutils
import warnings
warnings.filterwarnings('ignore')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

 

2) dataset 설정

 

class anomaly_dataset(Dataset):
    def __init__(self, inputs, labels, transform=None):
        self.inputs = inputs
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        inputs = np.transpose(self.inputs[idx], (2, 0, 1))
        labels = self.labels[idx]

        if self.transform:
            inputs = self.transform(inputs)

        return inputs, labels

'''
하이퍼 파라미터 설정
'''
epochs = 50
batch_size = 64
lr = 0.0002
ndf = 64
ngf = 64
latent_dim = 128
img_size = 64
channels = 3
n_critic = 5
split_rate = 0.8

'''
Dataset & loader 설정
'''

traindata = torchvision.datasets.CIFAR10(root='../data/', train=True, download=True)
testdata = torchvision.datasets.CIFAR10(root='../data/', train=False, download=True)

# Label 1을 Normal data, 나머지 0,2,3,4,5,6,7,8,9는 abnormal data
x_train_temp = torch.ByteTensor(traindata.data[torch.IntTensor(traindata.targets) == 1])
x_train_normal, x_valid_normal = x_train_temp.split((int(len(x_train_temp) * split_rate)), dim=0)

y_train_temp = torch.ByteTensor(traindata.targets)[torch.tensor(traindata.targets)==1]
y_train_normal, y_valid_normal = y_train_temp.split((int(len(y_train_temp) * split_rate)), dim=0)

train_cifar10 = anomaly_dataset(x_train_normal, y_train_normal, transform=transforms.Compose([
                                                                            transforms.ToPILImage(),
                                                                            transforms.Resize(img_size),
                                                                            transforms.ToTensor(),
                                                                            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                                                                            ]))
train_loader = DataLoader(train_cifar10, batch_size=batch_size, shuffle=True)

 

3) Generator & Discriminator

 

class Generator(nn.Module):
    def __init__(self, latent_dim=100, num_gf=32, channels=3, bias=True):
        '''
        latent_dim: Latent vector dimension
        num_gf: Number of Generator Filters
        channels: Number of Generator output channels
        정규분포로 noise 값을 받아 역 conv layer 통해 이미지를 만든다.
        '''
        super(Generator, self).__init__()
        self.layer = nn.Sequential(
                                   nn.ConvTranspose2d(latent_dim, num_gf*8, 4, 1, 0, bias=bias),
                                   nn.BatchNorm2d(num_gf*8),
                                   nn.ReLU(),
                                   
                                   nn.ConvTranspose2d(num_gf*8, num_gf*4, 4, 2, 1, bias=bias),
                                   nn.BatchNorm2d(num_gf*4),
                                   nn.ReLU(),
                                   
                                   nn.ConvTranspose2d(num_gf*4, num_gf*2, 4, 2, 1, bias=bias),
                                   nn.BatchNorm2d(num_gf*2),
                                   nn.ReLU(),
                                   
                                   nn.ConvTranspose2d(num_gf*2, num_gf, 4, 2, 1, bias=bias),
                                   nn.BatchNorm2d(num_gf),
                                   nn.ReLU(),

                                   nn.ConvTranspose2d(num_gf, channels, 4, 2, 1, bias=bias),
                                   nn.Tanh()
                                )

    def forward(self, z):
        z = self.layer(z)
        return z


class Discriminator(nn.Module):
    '''
    loss function에 사용할 feature mapping함수와 그대로 backpropagation에 사용할 부분을 나누어 준다.
    '''
    def __init__(self, num_df=32, channels=3, bias=True):
        super(Discriminator, self).__init__()
        self.feature_layer = nn.Sequential(
                                    nn.Conv2d(channels, num_df, 4, 2, 1, bias=bias),
                                    nn.BatchNorm2d(num_df),
                                    nn.LeakyReLU(0.2, inplace=True),

                                    nn.Conv2d(num_df, num_df*2, 4, 2, 1, bias=bias),
                                    nn.BatchNorm2d(num_df*2),
                                    nn.LeakyReLU(0.2, inplace=True),

                                    nn.Conv2d(num_df*2, num_df*4, 4, 2, 1, bias=bias),
                                    nn.BatchNorm2d(num_df*4),
                                    nn.LeakyReLU(0.2, inplace=True),

                                    nn.Conv2d(num_df*4, num_df*8, 4, 2, 1, bias=bias),
                                    nn.BatchNorm2d(num_df*8),
                                    nn.LeakyReLU(0.2, inplace=True),
                                    )

        self.dis_layer = nn.Sequential(nn.Conv2d(num_df*8, 1, 4, 1, 0, bias=bias),
                                       nn.Sigmoid()
                                    )

    def forward_features(self, x):
        features = self.feature_layer(x)
        return features
    
    def forward(self, x):
        features = self.forward_features(x)
        discrimination = self.dis_layer(features)
        return discrimination

def weights_init(m):
    '''
    weight 초기화
    '''
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        torch.nn.init.normal_(m.weight, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        torch.nn.init.normal_(m.weight, 1.0, 0.02)
        torch.nn.init.zeros_(m.bias)

 

4) loss, optimizer, scheduler 설정

 

G = Generator(latent_dim=latent_dim, num_gf=ngf, channels=channels, bias=False).to(device)
G.apply(weights_init)
D = Discriminator(num_df=ndf, channels=channels, bias=False).to(device)
D.apply(weights_init)
criterion = nn.BCELoss()

optimizer_G = torch.optim.Adam(G.parameters(), lr=lr, weight_decay=1e-5, betas=(0.5, 0.999))
optimizer_D = torch.optim.Adam(D.parameters(), lr=lr, weight_decay=1e-5, betas=(0.5, 0.999))
scheduler_G = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer_G, T_0=15, T_mult=2)
scheduler_D = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer_D, T_0=15, T_mult=2)

 

5) DCGAN 학습

 

'''
학습
1 epoch당 discriminator 1번, generator 3번 학습한다.
mode collapse : 한쪽이 너무 잘맞추는 경우를 말한다.
'''
img_list = []
d_losses = []
g_losses = []
iters = 0
D.train()
G.train()
steps_per_epoch = len(train_loader)
for epoch in range(1, epochs+1):
    for i, (images, _) in enumerate(train_loader):
        # discriminator 학습
        real_images = images.to(device)
        batch_num = images.size(0)
        optimizer_D.zero_grad()
        real_output = D(real_images)
        real_label = torch.ones_like(real_output, device=device)
        
        z = torch.randn(batch_num, latent_dim, 1, 1, device=device) # 우선 무작위로 latent dim 설정
        fake_images = G(z) # 가짜 이미지 생성
        fake_output = D(fake_images.detach()) # discriminator의 판단(확률)
        fake_label = torch.zeros_like(fake_output, device=device) # 가짜 이미지이므로 모두 0처리

        real_lossD = criterion(real_output, real_label) # log(D(x))
        fake_lossD = criterion(fake_output, fake_label) # log((1-D(G(z))))
        D_loss = real_lossD + fake_lossD
        D_loss.backward()
        optimizer_D.step()

        P_real = real_output.mean().item() # Discriminator가 진짜 image를 진짜라고 판별한 확률
        P_fake = fake_output.mean().item() # Discriminator가 가짜 image를 진짜라고 판별한 확률

        for _ in range(3):
        # generator 학습
            optimizer_G.zero_grad()
            fake_images = G(z)
            fake_output = D(fake_images)

            G_loss = criterion(fake_output, torch.ones_like(fake_output, device=device)) # log(D(G(z)))
            G_loss.backward()
            optimizer_G.step()

        scheduler_D.step(epoch + i * steps_per_epoch)
        scheduler_G.step(epoch + i * steps_per_epoch)

        d_losses.append(D_loss.item())
        g_losses.append(G_loss.item())

        if (iters % 500 == 0) or ((epoch == epochs) and (i == len(train_loader)-1)):
            with torch.no_grad():
                fake_images = G(z).detach().cpu()
            img_list.append(vutils.make_grid(fake_images, padding=2, normalize=True))
        iters += 1

    if epoch % 5 == 0:
        print(f'Epoch {epoch}/{epochs} | D loss: {D_loss.item():.6f} | G loss: {G_loss.item():.6f} | P(real): {P_real:.4f} | P(fake): {P_fake:.4f}')

 

6) DCGAN 학습 결과

 

 

Discriminator가 진짜와 가짜를 잘 구분 못하도록 학습하였다.

 

 

7) Mapping to the latent space

 

def residual_loss(real_images, generated_images):
    '''
    각 픽셀별로 유사도를 측정하기 위해 실제이미지와 생성이미지의 차의 합
    '''
    subtract = real_images - generated_images

    return torch.sum(torch.abs(subtract))

def discriminator_loss(netD, real_images, generated_images):
    '''
    discriminator의 feature mapping을 받아 계산
    '''
    real_features = D.forward_features(real_images)
    generated_features = D.forward_features(generated_images)

    subtract = real_features - generated_features

    return torch.sum(torch.abs(subtract))

def anomaly_loss(residual_loss, d_loss, l=0.1):
    '''
    최종 사용할 loss식, lambda값은 논문과 동일하게 0.1로 설정하였다.
    '''
    return (1 - l) * residual_loss + l * d_loss
'''
x_test_normal: Traindata 중 valid용으로 빼놓은 정상 class data(여기선 class1)
x_train_abnormal: Traindata 중 비정상 class data (0, 2, 3 ~ 9)
x_test_total: Total Testset (모든 class 존재)
'''
x_train_abnormal = torch.ByteTensor(traindata.data[torch.IntTensor(traindata.targets) != 1])
y_train_abnormal = torch.ByteTensor(traindata.targets)[torch.tensor(traindata.targets) != 1]

x_test_total = torch.ByteTensor(testdata.data)
y_test_total = torch.ByteTensor(testdata.targets)

x_test = torch.cat([x_valid_normal, x_train_abnormal, x_test_total], dim=0)
y_test = torch.cat([y_valid_normal, y_train_abnormal, y_test_total], dim=0)

test_cifar10 = anomaly_dataset(x_test, y_test, transform=transforms.Compose([
                                                                            transforms.ToPILImage(),
                                                                            transforms.Resize(img_size),
                                                                            transforms.ToTensor(),
                                                                            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                                                            ]))

test_loader = DataLoader(test_cifar10, batch_size=1, shuffle=False)
'''
학습
'''
latent_space = []
G.eval()
D.eval()

z = torch.randn(1, latent_dim, 1, 1, device=device, requires_grad=True)
optimizer_z = torch.optim.Adam([z], lr=lr)

latent_space = []
auc = []
for i, (images, _) in enumerate(test_loader):
    real_images = images.to(device)
    print(f'image{i+1}')

    for step in range(101):
        generated_images = G(z)

        optimizer_z.zero_grad()
        resi_loss = residual_loss(real_images, generated_images)
        disc_loss = discriminator_loss(D, real_images, generated_images)
        ano_loss = anomaly_loss(resi_loss, disc_loss, l=0.1)
        ano_loss.backward(retain_graph = True)
        optimizer_z.step()

        if step%100 == 0:
            loss = ano_loss.item()
            noises = torch.sum(z).item()
            print("[%d]\t loss_Ano:%.4f  Sum_of_z:%.4f" %(step,loss,noises))

            if step==100:
                latent_space.append(z.cpu().data.numpy())
    
    if i == 100 : # 테스트용으로 100개의 이미지에 대해서만 latent space를 조정해 보았다.
        assert False

 

각 이미지당 100번씩 조정하였으며, 시간관계상 100개의 이미지로만 latent space를 조정해 보았습니다.