본문 바로가기
DeepLearning Framework & Coding/Pytorch

[Pytorch-11] Auto Encoder

by 노마드공학자 2021. 8. 14.

[pytorch 따라하기-1] 구글 Colab에 pytorch 세팅하기 https://limitsinx.tistory.com/136

[pytorch 따라하기-2] Tensor생성 및 Backward https://limitsinx.tistory.com/137 

[pytorch 따라하기-3] 경사하강법을 통한 선형회귀 구현 https://limitsinx.tistory.com/138

[pytorch 따라하기-4] 인공신경망(ANN) 구현 https://limitsinx.tistory.com/139

[pytorch 따라하기-5] 합성곱신경망(CNN) 구현 https://limitsinx.tistory.com/140

[pytorch 따라하기-6] Neural Style Transfer 구현 https://limitsinx.tistory.com/141

[pytorch 따라하기-7] pix2pix 구현 https://limitsinx.tistory.com/142

[pytorch 따라하기-8] DC-GAN(Deep Convolutional GAN) 구현 https://limitsinx.tistory.com/143 

[pytorch 따라하기-9] LSTM을 통한 시계열데이터 예측모델 구현 https://limitsinx.tistory.com/144

[pytorch-10] One Class SVM(Support Vector Machine, OC-SVM), SVDD https://limitsinx.tistory.com/147

 

※이 전글에서 정리한 코드/문법은 재설명하지 않으므로, 참고부탁드립니다

※해당 글은 PC에서 보기에 최적화 되어있습니다.


Auto Encoder?

 

오토인코더란 인풋으로 넣은 결과값이 아웃풋으로 그대로 나오는걸 의미합니다.

 

출처 : wikidocs, 오토인코더를 위한 딥러닝

 

저는 처음 오토인코더라는 분야를 접했을때 의문이 들었습니다.

 

"딥러닝을 사용하는건 알겠는데, Input이랑 output이 똑같이 나오는게 무슨의미가있지??"

"아무리 완벽한 시스템이라도 Input대비 일정부분 Data loss가 발생한채 나오는 output이 무슨의미가있을까?"

 

출처 : https://brunch.co.kr/@hansungdev/31

 

제 연구분야에서는 시계열(Time series)데이터를 주로 다루는데요

특히, 시계열 데이터를 통한 Anomaly Detection이 중요한 분야입니다.

즉, 이미지같은 학습이 아니라 센싱 및 추정값을 통해 얻어지는 빅데이터를 분석하여 현재 상태가 정상이냐 아니냐를 판단하는 일을 하고있습니다.

 

Anomaly Detection를 빅데이터 및 딥러닝으로 해결하고자 할때는 큰 문제가 있습니다.

공장에서 불량품 판별을 해본다고해보시죠

100만개의 양품중, 10개도 안되는 불량품이 나올때도 있습니다. 즉, 비정상데이터와 정상데이터의 갯수가 불균형이라는 문제점이 발생하죠

이런 경우에 일반적인 딥러닝(Classification)으로 학습을 시킨다면, 결과는 양품쪽에 Overfitting되어 잘 찾아내지 못하게 됩니다.

 

출처 : https://mc.ai/auto-encoder-in-biology/

 

이런 경우를 오토인코더로 해결해보고자 연구자들이 시도하게 됩니다.

 

오토인코더의 전체적인 흐름은 다음과 같습니다.

 

① Encoder : 원본데이터의 특징을 추출

② Latent Vector : Encoder를 통해 얻어낸 특징들

③ Decoder : Latent Vector를 통해 다시 원본데이터형태로 반환

④ Decoding한 결과값과 원본데이터의 차이를 Loss로보고 epoch 반복

 

이렇게 ①~④가 계속 반복되며 최적화를 이루어가는 형태입니다.

 

따라서 100만개의 정상데이터를 오토인코더로 학습을 시키게되면 Loss값이 0으로 수렴하게 되겠죠?

이해하기 쉽게 Loss가 0이되는 오토인코더를 만들었다고 가정해보시죠!

 

그럼 정상데이터를 넣었을때 정상데이터가 동일하게 나오도록 만들어진 시스템에, 비정상 데이터를 넣게되면

Encoding과 Decoding시 뭔가 다른 특징을 뽑아내고 다른 결과값이 나오겠죠?

그럼 비정상데이터가 오토인코더를 통과할시 원본그대로 나오진 않을것입니다. Loss가 반드시 발생하게 됩니다.

이런 경우, 비정상이라고 판단을 하는 원리입니다.

(여기서 Loss가 얼마이상일때 비정상으로 볼것이냐?(Threshold)는 사용자가 임의로 잡아줄 수 있습니다.)

 

Encoder와 Decoder은 특징을 뽑고 다시 재변환하는 딥러닝 분야이기에 각각 다른 딥러닝모델을 써줄 수 있습니다.

CNN - LSTM/ LSTM-LSTM/ GRU - ... 본인이 다루고 있는 분야에 적합한 조합을 사용하시면 됩니다!

 

출처 : https://www.atlantis-press.com/journals/jrnal/125935236/view

 

시계열데이터를 학습하시는 경우 FFT나 Laplace,Z같이 한번 Domain을 바꾼 데이터로 오토인코딩을 해보면 경우에 따라 의외로 특징이 잘잡히는 경우도 있으니 한번 추천드립니다!

 

코드는 가장 기본인 일반 뉴럴네트워크(DNN)로 진행해보았습니다.

 

 

코드

 

# Created by Hyunjun,JANG

# limitsinx.tistory.com

# Last revision date : 2021.08.11

 

import numpy as np

import pandas as pd

import pandas_datareader.data as pdr

import matplotlib.pyplot as plt

import datetime

import torch

import torch.nn as nn

from torch.autograd import Variable

import torch.optim as optim

from torch.utils.data import Dataset, DataLoader

from sklearn.preprocessing import StandardScaler, MinMaxScaler

 

# GPU setting

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#print(torch.cuda.get_device_name(0)) #Google colab = TESLA T4

 

 

# Random seed to make results deterministic and reproducible

torch.manual_seed(0)

 

# scaling function for input data

def minmax_scaler(data):

    numerator = data - np.min(data, 0)

    denominator = np.max(data, 0) - np.min(data, 0)

    return numerator / (denominator + 1e-7)

 

# Do not use in DNN-AE

# make dataset to input

def build_dataset(time_seriesseq_length):

    dataX = []

    dataY = []

    for i in range(0, len(time_series) - seq_length):

        _x = time_series[i:i + seq_length, :]

        _y = time_series[i + seq_length, [-1]]  # Next close price

        print(_x, "->", _y)

        dataX.append(_x)

        dataY.append(_y)

    return np.array(dataX), np.array(dataY)

 

xy_train = np.loadtxt("/dir", delimiter=",")

xy_test = np.loadtxt("/dir",delimiter = ",")

 

xy_test_over20 = np.loadtxt("/dir",delimiter = ",")

 

#for de-normalization

test_for_denorm = xy_test[:,-1]

test_20_for_denorm = xy_test_over20[:,-1]

denorm_min = np.min(test_for_denorm)

denorm_max = np.max(test_for_denorm)

denorm_min_20 = np.min(test_20_for_denorm)

denorm_max_20 = np.max(test_20_for_denorm)

 

# hyper parameters

seq_length = 1

data_dim = 5

hidden_dim = 15

input_dim = 5

output_dim = 1



# split train-test set

train_size = int(len(xy_train) * 1)

train_set = xy_train[0:train_size]

 

test_size = int(len(xy_test) * 1)

test_set = xy_test[0:test_size-seq_length]

 

test_20_size = int(len(xy_test_over20))

test_20_set = xy_test_over20[0:test_20_size - seq_length]

#test_set = xy[train_size - seq_length:]

 

# scaling data

train_set = minmax_scaler(train_set)

test_set = minmax_scaler(test_set)

test_20_set = minmax_scaler(test_20_set)

 

# make train-test dataset to input

trainX, trainY = build_dataset(train_set, seq_length)

testX, testY = build_dataset(test_set, seq_length)

testX20, testY20 = build_dataset(test_20_set, seq_length)

 

# convert to tensor

trainX_tensor = torch.FloatTensor(trainX)

trainY_tensor = torch.FloatTensor(trainY)

 

testX_tensor = torch.FloatTensor(testX)

testY_tensor = torch.FloatTensor(testY)

 

testX20_tensor = torch.FloatTensor(testX20)

testY20_tensor = torch.FloatTensor(testY20)

 

class Net(nn.Module):

  def __init__(selfinput_dimoutput_dim: 

    super(Net, self).__init__()

    self.input_size = input_dim

    self.output_size = output_dim

    self.fc1 = nn.Linear(input_dim,64)

    self.fc2 = nn.Linear(64,128)

    self.fc3 = nn.Linear(128,256)

    self.fc4 = nn.Linear(256, 256)

    self.fc5 = nn.Linear(256,128)

    self.fc6 = nn.Linear(128,64)

    self.fc7 = nn.Linear(64,input_dim)

    self.relu = nn.ReLU()

 

  def forward(selfx):

    x = self.fc1(x)

    x = self.relu(x)

    x = self.fc2(x)

    x = self.relu(x)

    x = self.fc3(x)

    x = self.relu(x)

    x = self.fc4(x)

    x = self.relu(x)

    x = self.fc5(x)

    x = self.relu(x)

    x = self.fc6(x)

    x = self.relu(x)

    x = self.fc7(x)

 

    return x;

 

model = Net(input_dim,output_dim)

 

criterion = nn.MSELoss()

optimizer = optim.Adam(model.parameters(), lr = 0.005)

 

for epoch in range(500) : 

 

  total_loss = 0

  d = []

  

  optimizer.zero_grad()

  output = model(trainX_tensor)

  loss = criterion(output, trainX_tensor)

  loss.backward()

  optimizer.step()

 

  total_loss += loss.data.item()

 

  if (epoch+1) % 10 == 0:

    print(epoch+1, total_loss)

 

#검증

test_output = model(testX_tensor)

print("Test output : ",test_output)

print("testX_tensor : ",testX_tensor)

error = abs(test_output) - abs(testX_tensor)

print("error : ",error)

print((sum(abs(error))))

print(sum(abs(test_output)))

print((sum(abs(testX_tensor))))

 

test_outputs = test_output[:,:,4]

testX_tensors = testX_tensor[:,:,4]

testX_test_org = testX_tensors*(denorm_max - denorm_min) + denorm_min

test_denormal = test_outputs*(denorm_max-denorm_min) + denorm_min

error = testX_test_org - test_denormal

error = sum(abs(error)/len(test_outputs))

print("MAE : ", error)

 

#From tensor to numpy

error_np = torch.tensor(error,requires_grad=True)

print(error_np)

print(type(error_np))

error_np = error_np.detach().numpy()

print(error_np)

print(type(error_np))

print(error_np.shape)

error_np.dtype

error_np_float16 = error_np.astype(np.float16)

print(error_np_float16)

댓글