因为在数学建模中用的很麻烦,写了个简易易使用版本。
以下代码为最终成品代码 demo 版本,不推荐使用,最新版本可点下载链接或可见 csdn。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
import torch import torch.nn as nn import numpy as np import matplotlib.pyplot as plt import torch.nn.functional as F import warnings import pandas as pd import scipy.stats as st # 设定随机种子,以确保实验的可重复性 torch.manual_seed(1) np.random.seed(1) # 定义LSTM模型 class LSTM(nn.Module): hidden = None def __init__(self, input_size, hidden_size, output_size, num_layers): super().__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.train_on_gpu = torch.cuda.is_available() # As batch_first=True, input: (batch_size, sequence_length, input_size) self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) self.lstm2 = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True) self.fc = nn.Linear(hidden_size, hidden_size) self.fcOut = nn.Linear(hidden_size, output_size) self.drop = nn.Dropout(0.5) # 可选操作,可以把下一行注释 self.apply(LSTM.init_weights) def forward(self, x): # 防止 loss.backward 报错 hidden = [each.data for each in self.hidden] x, hidden = self.lstm(x, hidden) x, self.hidden = self.lstm2(x, hidden) x = x[:, -1, :] x = F.relu(self.fc(x)) x = self.drop(x) x = self.fcOut(x) return x def init_hidden(self, batch_size): weight = next(self.parameters()).data if self.train_on_gpu: self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_().cuda(), weight.new(self.num_layers, batch_size, self.hidden_size).zero_().cuda()) else: self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_(), weight.new(self.num_layers, batch_size, self.hidden_size).zero_()) @staticmethod def init_weights(m): if type(m) == nn.LSTM: for name, param in m.named_parameters(): if 'weight_ih' in name: torch.nn.init.orthogonal_(param.data) elif 'weight_hh' in name: torch.nn.init.orthogonal_(param.data) elif 'bias' in name: param.data.fill_(0) elif type(m) == nn.Conv1d or type(m) == nn.Linear: torch.nn.init.orthogonal_(m.weight) m.bias.data.fill_(0) class CustomLSTM: X, Y, data, model, optimizer, criterion, result, result_, window, losses = [None] * 10 def __init__(self, data: np.array, window): self.data = data self.input_dim = self.init_data() self.output_dim = self.input_dim self.slice(window) # data 可为一维数组或二维数组,强制把一维数组转化为二维数组 def init_data(self): assert (length := len(self.data.shape)) in [1, 2] if length == 1: self.data = self.data[:, np.newaxis] return len(self.data[0]) # 检查总数据大小是否能整除 batch def check_batch(self, batch_size): length = self.X.shape[0] # 保证 batch_size 比总长度小 assert length >= batch_size if batch_size * (length // batch_size) != length: warnings.warn(f'数据大小为{length}, batch大小为{batch_size},无法整除,会损失{(length % batch_size) / length * 100}%数据', DeprecationWarning) # 以 window 为窗口大小切片形成整个batch def slice(self, window): self.window = window X, Y = [], [] for i in range(len(self.data) - window): X.append(self.data[i:i + window]) Y.append(self.data[i + window]) X = np.array(X) Y = np.array(Y) X = torch.from_numpy(X).float() # (batch_size, sequence_length, input_size) Y = torch.from_numpy(Y).float() print(f"数据格式:X = {X.shape}, Y = {Y.shape}") self.X = X self.Y = Y def re_slice(self, window): self.X, self.Y, self.model, self.optimizer, self.criterion, self.result, self.result_, self.losses = [None] * 9 self.slice(window) # 初始化 LSTM model def init_lstm(self, hidden=64, lr=0.001, num_layers=1): self.model = LSTM(self.input_dim, hidden, self.output_dim, num_layers) self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr) self.criterion = nn.MSELoss() if torch.cuda.is_available(): self.model.cuda() self.X.cuda() self.Y.cuda() # 总数据产生 batch 并可以进行 shuffle @staticmethod def iterate_batches(inputs, targets, batchsize, shuffle=True): assert len(inputs) == len(targets) if shuffle: indices = np.arange(len(inputs)) np.random.shuffle(indices) for start_idx in range(0, len(inputs) - batchsize + 1, batchsize): if shuffle: excerpt = indices[start_idx:start_idx + batchsize] else: excerpt = slice(start_idx, start_idx + batchsize) yield inputs[excerpt], targets[excerpt] # 开始训练 def train(self, num_epochs=100, batch_size=128, max_batch=False): losses = [] if self.model is None: raise ValueError("请先使用CustomLSTM.init_lstm初始化网络") if max_batch: batch_size = self.X.shape[0] else: self.check_batch(batch_size) for epoch in range(num_epochs): loss_all = 0 self.model.init_hidden(batch_size) for index, (batch_x, batch_y) in enumerate(CustomLSTM.iterate_batches(self.X, self.Y, batch_size, shuffle=True)): self.optimizer.zero_grad() outputs = self.model(batch_x) loss = self.criterion(outputs, batch_y) loss.backward() self.optimizer.step() loss_all += loss.detach().cpu() losses.append(loss_all / (index + 1)) if epoch % 20 == 0: print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs, loss_all / (index + 1))) self.losses = losses self.predicted() def plot_loss(self): if self.losses is None: raise ValueError("loss 不存在,请先进行训练") plt.figure(figsize=(12, 6)) plt.plot(self.losses) plt.xlabel("Epoch") plt.ylabel("Loss") plt.show() # 预测之后 n 天的值 def predict(self, n): # self.predicted() self.model.init_hidden(1) data = self.data[-self.window:, :].tolist() y = [] for i in range(n): x = torch.tensor(data).float().unsqueeze(0) result = self.model.cpu()(x).tolist() y.append(result[0]) data.append(result[0]) data.pop(0) self.result = np.array(y) return self.result # 预测从 window_size 到结束数据的预测值,以便与真实值做比较 def predicted(self): self.model.eval() self.model.init_hidden(len(self.X)) with torch.no_grad(): predicted = self.model(self.X) predicted = predicted.detach().cpu().numpy() self.result_ = predicted return self.result_ # 因为可以为二维数组即对多个变量进行预测,names即为每个变量的名字 def plot(self, names=None): if self.result is None: raise ValueError("请先使用CustomLSTM.predict") if names is None: names = [None] * len(self.data[0]) # further = self.predict(n) x = np.arange(len(self.data)) x_further = np.arange(len(self.data), len(self.data) + len(self.result)) plt.figure(figsize=(12, 6)) for i in range(len(self.data[0])): plt.plot(x, self.data, label=f'{names[0]} True Values') plt.plot(x[self.window:], self.result_[:, i], label=f'{names[0]} Predictions') plt.plot(x_further, self.result[:, i], label=f"{names[0]} Further Predictions") plt.show() # 画置信区间,demo 版本 # ToDo def plot_confidence(self, index=0, alpha=0.05): if self.result is None: raise ValueError("请先使用CustomLSTM.predict") plt.figure(figsize=(12, 6)) x = np.arange(len(self.data)) x_further = np.arange(len(self.data), len(self.data) + len(self.result)) y_true = self.data[self.window:, index].tolist() plt.plot(y_true, label='True Values') plt.plot(x_further, self.result[:, index], label="Further Predictions") plt.plot(x[self.window:], self.result_[:, index], label='Predictions') y_pred = self.result[:, index] lower, upper = Utils.ci(y_true, y_pred, alpha=alpha) plt.plot(y_pred, label='Predictions') plt.fill_between(np.arange(), lower, upper, alpha=0.2, label='Confidence interval') plt.legend() plt.show() # 打印 summary, 即 r^2 评价函数等 def summary(self): if self.model is None: raise ValueError("请先进行训练") print("==========Summary Begin===========") print("R2 =", score_r2 := Utils.r2(self.result_, self.data[self.window:, :])) print("MSE =", score_mse := Utils.mse(self.result_, self.data[self.window:, :])) print("RMSE =", score_rmse := np.sqrt(score_mse)) print("MAE =", score_mae := Utils.mae(self.result_, self.data[self.window:, :])) print("===========Summary end============") return score_r2, score_mse, score_rmse, score_mae # 工具 class Utils: # 置信区间,demo # ToDo @staticmethod def ci(y_true, y_pred, alpha=0.05): residuals = y_true - y_pred n = len(residuals) df = n - 1 t_value = st.norm.ppf(1 - alpha / 2, df) std_err = np.std(residuals, ddof=1) / np.sqrt(n) upper = residuals + t_value * std_err lower = residuals - t_value * std_err return lower, upper # r2 评价函数 @staticmethod def r2(y_pred, y_true): return 1 - ((y_pred - y_true) ** 2).sum(axis=0) / ((y_true.mean(axis=0) - y_true) ** 2).sum(axis=0) @staticmethod def mse(y_pred, y_true): return ((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred) @staticmethod def rmse(y_pred, y_true): return np.sqrt(((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred)) @staticmethod def mae(y_pred, y_true): return (np.absolute(y_true - y_pred)).sum(axis=0) / len(y_true) # 添加快捷打开文件操作 @staticmethod def openfile(name): file_type = name.split(".")[-1] if file_type == "csv": df = pd.read_csv(name, encoding='GBK') elif file_type == "xlsx" or file_type == "xls": df = pd.read_excel(name) else: raise TypeError(f"{name} 类型不是 csv, xls, xlsx") # df = df[["列名字1", "列名字2"]] print(df) return np.array(df) def load_data(): # 以下注释代码为快捷打开操作 # return Utils.openfile(file_location) data = np.sin(np.arange(100) * np.pi / 50) + np.random.randn(100) * 0.1 return data if __name__ == "__main__": # 加载数据 data = load_data() # 初始化网络 window_size = 10 batch = 100 lstm = CustomLSTM(data, window_size) lstm.init_lstm(hidden=64, lr=0.001, num_layers=1) # 训练网络 # max_batch 表示是否以整一个数据作为 batch 不做分割 lstm.train(num_epochs=1000, max_batch=True) # lstm.train(num_epochs=50, batch_size=30, max_batch=False) # 调整窗口大小重新训练 # lstm.re_slice(20) # lstm.init_lstm(hidden=64, lr=0.001, num_layers=1) # lstm.train(num_epochs=50, batch_size=40, max_batch=False) # 打印 summary r2, mse, rmse, mae = lstm.summary() # 预测之后 100 步数据 lstm.predict(100) # 画图 lstm.plot_loss() lstm.plot(['data']) # lstm.plot_confidence(index=0) |