import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F
import warnings
import pandas as pd
import scipy.stats as st
# 设定随机种子,以确保实验的可重复性
torch.manual_seed(1)
np.random.seed(1)
# 定义LSTM模型
class LSTM(nn.Module):
hidden = None
def __init__(self, input_size, hidden_size, output_size, num_layers):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.train_on_gpu = torch.cuda.is_available()
# As batch_first=True, input: (batch_size, sequence_length, input_size)
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.lstm2 = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, hidden_size)
self.fcOut = nn.Linear(hidden_size, output_size)
self.drop = nn.Dropout(0.5)
# 可选操作,可以把下一行注释
self.apply(LSTM.init_weights)
def forward(self, x):
# 防止 loss.backward 报错
hidden = [each.data for each in self.hidden]
x, hidden = self.lstm(x, hidden)
x, self.hidden = self.lstm2(x, hidden)
x = x[:, -1, :]
x = F.relu(self.fc(x))
x = self.drop(x)
x = self.fcOut(x)
return x
def init_hidden(self, batch_size):
weight = next(self.parameters()).data
if self.train_on_gpu:
self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_().cuda(),
weight.new(self.num_layers, batch_size, self.hidden_size).zero_().cuda())
else:
self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_(),
weight.new(self.num_layers, batch_size, self.hidden_size).zero_())
@staticmethod
def init_weights(m):
if type(m) == nn.LSTM:
for name, param in m.named_parameters():
if 'weight_ih' in name:
torch.nn.init.orthogonal_(param.data)
elif 'weight_hh' in name:
torch.nn.init.orthogonal_(param.data)
elif 'bias' in name:
param.data.fill_(0)
elif type(m) == nn.Conv1d or type(m) == nn.Linear:
torch.nn.init.orthogonal_(m.weight)
m.bias.data.fill_(0)
class CustomLSTM:
X, Y, data, model, optimizer, criterion, result, result_, window, losses = [None] * 10
def __init__(self, data: np.array, window):
self.data = data
self.input_dim = self.init_data()
self.output_dim = self.input_dim
self.slice(window)
# data 可为一维数组或二维数组,强制把一维数组转化为二维数组
def init_data(self):
assert (length := len(self.data.shape)) in [1, 2]
if length == 1:
self.data = self.data[:, np.newaxis]
return len(self.data[0])
# 检查总数据大小是否能整除 batch
def check_batch(self, batch_size):
length = self.X.shape[0]
# 保证 batch_size 比总长度小
assert length >= batch_size
if batch_size * (length // batch_size) != length:
warnings.warn(f'数据大小为{length}, batch大小为{batch_size},无法整除,会损失{(length % batch_size) / length * 100}%数据', DeprecationWarning)
# 以 window 为窗口大小切片形成整个batch
def slice(self, window):
self.window = window
X, Y = [], []
for i in range(len(self.data) - window):
X.append(self.data[i:i + window])
Y.append(self.data[i + window])
X = np.array(X)
Y = np.array(Y)
X = torch.from_numpy(X).float() # (batch_size, sequence_length, input_size)
Y = torch.from_numpy(Y).float()
print(f"数据格式:X = {X.shape}, Y = {Y.shape}")
self.X = X
self.Y = Y
def re_slice(self, window):
self.X, self.Y, self.model, self.optimizer, self.criterion, self.result, self.result_, self.losses = [None] * 9
self.slice(window)
# 初始化 LSTM model
def init_lstm(self, hidden=64, lr=0.001, num_layers=1):
self.model = LSTM(self.input_dim, hidden, self.output_dim, num_layers)
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
self.criterion = nn.MSELoss()
if torch.cuda.is_available():
self.model.cuda()
self.X.cuda()
self.Y.cuda()
# 总数据产生 batch 并可以进行 shuffle
@staticmethod
def iterate_batches(inputs, targets, batchsize, shuffle=True):
assert len(inputs) == len(targets)
if shuffle:
indices = np.arange(len(inputs))
np.random.shuffle(indices)
for start_idx in range(0, len(inputs) - batchsize + 1, batchsize):
if shuffle:
excerpt = indices[start_idx:start_idx + batchsize]
else:
excerpt = slice(start_idx, start_idx + batchsize)
yield inputs[excerpt], targets[excerpt]
# 开始训练
def train(self, num_epochs=100, batch_size=128, max_batch=False):
losses = []
if self.model is None:
raise ValueError("请先使用CustomLSTM.init_lstm初始化网络")
if max_batch:
batch_size = self.X.shape[0]
else:
self.check_batch(batch_size)
for epoch in range(num_epochs):
loss_all = 0
self.model.init_hidden(batch_size)
for index, (batch_x, batch_y) in enumerate(CustomLSTM.iterate_batches(self.X, self.Y, batch_size, shuffle=True)):
self.optimizer.zero_grad()
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
loss.backward()
self.optimizer.step()
loss_all += loss.detach().cpu()
losses.append(loss_all / (index + 1))
if epoch % 20 == 0:
print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs, loss_all / (index + 1)))
self.losses = losses
self.predicted()
def plot_loss(self):
if self.losses is None:
raise ValueError("loss 不存在,请先进行训练")
plt.figure(figsize=(12, 6))
plt.plot(self.losses)
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()
# 预测之后 n 天的值
def predict(self, n):
# self.predicted()
self.model.init_hidden(1)
data = self.data[-self.window:, :].tolist()
y = []
for i in range(n):
x = torch.tensor(data).float().unsqueeze(0)
result = self.model.cpu()(x).tolist()
y.append(result[0])
data.append(result[0])
data.pop(0)
self.result = np.array(y)
return self.result
# 预测从 window_size 到结束数据的预测值,以便与真实值做比较
def predicted(self):
self.model.eval()
self.model.init_hidden(len(self.X))
with torch.no_grad():
predicted = self.model(self.X)
predicted = predicted.detach().cpu().numpy()
self.result_ = predicted
return self.result_
# 因为可以为二维数组即对多个变量进行预测,names即为每个变量的名字
def plot(self, names=None):
if self.result is None:
raise ValueError("请先使用CustomLSTM.predict")
if names is None:
names = [None] * len(self.data[0])
# further = self.predict(n)
x = np.arange(len(self.data))
x_further = np.arange(len(self.data), len(self.data) + len(self.result))
plt.figure(figsize=(12, 6))
for i in range(len(self.data[0])):
plt.plot(x, self.data, label=f'{names[0]} True Values')
plt.plot(x[self.window:], self.result_[:, i], label=f'{names[0]} Predictions')
plt.plot(x_further, self.result[:, i], label=f"{names[0]} Further Predictions")
plt.show()
# 画置信区间,demo 版本
# ToDo
def plot_confidence(self, index=0, alpha=0.05):
if self.result is None:
raise ValueError("请先使用CustomLSTM.predict")
plt.figure(figsize=(12, 6))
x = np.arange(len(self.data))
x_further = np.arange(len(self.data), len(self.data) + len(self.result))
y_true = self.data[self.window:, index].tolist()
plt.plot(y_true, label='True Values')
plt.plot(x_further, self.result[:, index], label="Further Predictions")
plt.plot(x[self.window:], self.result_[:, index], label='Predictions')
y_pred = self.result[:, index]
lower, upper = Utils.ci(y_true, y_pred, alpha=alpha)
plt.plot(y_pred, label='Predictions')
plt.fill_between(np.arange(), lower, upper, alpha=0.2, label='Confidence interval')
plt.legend()
plt.show()
# 打印 summary, 即 r^2 评价函数等
def summary(self):
if self.model is None:
raise ValueError("请先进行训练")
print("==========Summary Begin===========")
print("R2 =", score_r2 := Utils.r2(self.result_, self.data[self.window:, :]))
print("MSE =", score_mse := Utils.mse(self.result_, self.data[self.window:, :]))
print("RMSE =", score_rmse := np.sqrt(score_mse))
print("MAE =", score_mae := Utils.mae(self.result_, self.data[self.window:, :]))
print("===========Summary end============")
return score_r2, score_mse, score_rmse, score_mae
# 工具
class Utils:
# 置信区间,demo
# ToDo
@staticmethod
def ci(y_true, y_pred, alpha=0.05):
residuals = y_true - y_pred
n = len(residuals)
df = n - 1
t_value = st.norm.ppf(1 - alpha / 2, df)
std_err = np.std(residuals, ddof=1) / np.sqrt(n)
upper = residuals + t_value * std_err
lower = residuals - t_value * std_err
return lower, upper
# r2 评价函数
@staticmethod
def r2(y_pred, y_true):
return 1 - ((y_pred - y_true) ** 2).sum(axis=0) / ((y_true.mean(axis=0) - y_true) ** 2).sum(axis=0)
@staticmethod
def mse(y_pred, y_true):
return ((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred)
@staticmethod
def rmse(y_pred, y_true):
return np.sqrt(((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred))
@staticmethod
def mae(y_pred, y_true):
return (np.absolute(y_true - y_pred)).sum(axis=0) / len(y_true)
# 添加快捷打开文件操作
@staticmethod
def openfile(name):
file_type = name.split(".")[-1]
if file_type == "csv":
df = pd.read_csv(name, encoding='GBK')
elif file_type == "xlsx" or file_type == "xls":
df = pd.read_excel(name)
else:
raise TypeError(f"{name} 类型不是 csv, xls, xlsx")
# df = df[["列名字1", "列名字2"]]
print(df)
return np.array(df)
def load_data():
# 以下注释代码为快捷打开操作
# return Utils.openfile(file_location)
data = np.sin(np.arange(100) * np.pi / 50) + np.random.randn(100) * 0.1
return data
if __name__ == "__main__":
# 加载数据
data = load_data()
# 初始化网络
window_size = 10
batch = 100
lstm = CustomLSTM(data, window_size)
lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
# 训练网络
# max_batch 表示是否以整一个数据作为 batch 不做分割
lstm.train(num_epochs=1000, max_batch=True)
# lstm.train(num_epochs=50, batch_size=30, max_batch=False)
# 调整窗口大小重新训练
# lstm.re_slice(20)
# lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
# lstm.train(num_epochs=50, batch_size=40, max_batch=False)
# 打印 summary
r2, mse, rmse, mae = lstm.summary()
# 预测之后 100 步数据
lstm.predict(100)
# 画图
lstm.plot_loss()
lstm.plot(['data'])
# lstm.plot_confidence(index=0)