AutoEncoder可以参考:pytorch实现基本AutoEncoder与案例
AutoEncoder填充缺失值的思路是:
- 在完整的数据集上训练一个AutoEncoder
- 把有缺失的数据拿过来,先全零填充原有的值,再通过AutoEncoder得到结果
- 将得到的结果对应填充到原有的缺失数据上
背后的理由是,数据压缩后已经很好的掌握了原有数据的一些规律,才能很好的复原。借助这种规律来反推出缺失值,这就是AutoEncoder的有效性的来源。
示例代码
import torch
import torch.nn as nn
import torch.utils.data as Data
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
def get_train_data():
"""得到数据
:return data_x:有缺失值的数据
:return true_value:缺失数据的原始真实值
:return data_y:原问题中待预测的label
"""
def get_tensor_from_pd(dataframe_series) -> torch.Tensor:
return torch.tensor(data=dataframe_series.values)
import copy
from sklearn.datasets import make_classification
data_x, data_y = make_classification(n_samples=1000, n_classes=4, n_features=40, n_informative=4,
random_state=0) # 6个特征
data_x = pd.DataFrame(data_x)
data_x.columns = ["x{}".format(i + 1) for i in range(39)] + ["miss_line"]
true_data = copy.deepcopy(data_x)
# 在miss_line这一列删除20%的数据,来模拟缺失值的场景
drop_index = data_x.sample(frac=0.1).index # 有缺失值的index
data_x.loc[drop_index, "miss_line"] = np.nan
true_value = true_data.loc[drop_index, 'miss_line'] # 空值的真实值
# 开始构造数据
# data_x为全部的数据(包含完整数据、有缺失项的数据)
full_x = data_x.drop(drop_index)
lack_x = data_x.loc[drop_index]
return get_tensor_from_pd(full_x).float(), get_tensor_from_pd(lack_x).float(), true_value
class AutoEncoder(nn.Module):
def __init__(self, input_size=300, hidden_layer_size=20):
super().__init__()
self.hidden_layer_size = hidden_layer_size
# 输入与输出的维度相同
self.input_size = input_size
self.output_size = input_size
self.encode_linear = nn.Linear(self.input_size, hidden_layer_size)
self.decode_linear = nn.Linear(hidden_layer_size, self.output_size)
self.relu = nn.ReLU()
self.sigmoid = nn.Sigmoid()
def forward(self, input_x):
# encode
encode_linear = self.encode_linear(input_x)
encode_out = self.relu(encode_linear)
# decode
decode_linear = self.decode_linear(encode_out) # =self.linear(lstm_out[:, -1, :])
predictions = self.sigmoid(decode_linear)
return predictions
if __name__ == '__main__':
# 得到数据
print("开始训练Auto Encoder")
full_data, lack_data, true_x = get_train_data()
train_loader = Data.DataLoader(
dataset=Data.TensorDataset(full_data), # 封装进Data.TensorDataset()类的数据,可以为任意维度
batch_size=20, # 每块的大小
shuffle=True, # 要不要打乱数据 (打乱比较好)
num_workers=4, # 多进程(multiprocess)来读数据
)
# 建模三件套:loss,优化,epochs
model = AutoEncoder(input_size=full_data.size()[1]) # 模型
loss_function = nn.MSELoss() # loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 优化器
epochs = 100
# 开始训练
model.train()
for i in range(epochs):
epoch_loss: list = []
for seq in train_loader:
# seq = torch.where(torch.isnan(seq), torch.full_like(seq, 0), seq) # 全0填充缺失值
seq = seq[0]
optimizer.zero_grad()
y_pred = model(seq).squeeze() # 压缩维度:得到输出,并将维度为1的去除
single_loss = loss_function(y_pred, seq)
single_loss.backward()
optimizer.step()
epoch_loss.append(float(single_loss.detach()))
print("EPOCH", i, "LOSS: ", np.mean(epoch_loss))
# 开始填充缺失值
lack_loader = Data.DataLoader(
dataset=Data.TensorDataset(lack_data), # 封装进Data.TensorDataset()类的数据,可以为任意维度
batch_size=20, # 每块的大小
shuffle=True, # 要不要打乱数据 (打乱比较好)
num_workers=4, # 多进程(multiprocess)来读数据
)
model.eval()
pred_lack = np.array([])
for seq in lack_loader:
seq = seq[0]
# 每个seq[:,-1]都是缺失值的位置
seq = torch.where(torch.isnan(seq), torch.full_like(seq, 0), seq) # 全0填充缺失值
y_pred = model(seq).squeeze() # 压缩维度:得到输出,并将维度为1的去除
lack_pred = y_pred[:, -1] # AutoEncoder预测的缺失值
pred_lack = np.append(pred_lack, np.array(lack_pred.detach().numpy()))
print("预测结果的MSE:", mean_squared_error(true_x, pred_lack))
结果如下:
EPOCH 0 LOSS: 1.458275842666626
EPOCH 1 LOSS: 1.3739446348614164
EPOCH 2 LOSS: 1.2893095705244275
EPOCH 3 LOSS: 1.2242507404751248
EPOCH 4 LOSS: 1.1825604584481981
......
EPOCH 95 LOSS: 0.9070883777406481
EPOCH 96 LOSS: 0.9068020873599583
EPOCH 97 LOSS: 0.9063400851355659
EPOCH 98 LOSS: 0.9060421744982402
EPOCH 99 LOSS: 0.9056544555558099
预测结果的MSE: 1.2656838427106964
明显训练AutoEncoder迭代100次是不够的,一般需要持续迭代,直到Loss降到一个可接受的范围才行