就读杭州电子科技大学自动化专业,大四。即将为杭州电子科技大学电子信息学院研究生。
喜欢的游戏:明日方舟、csgo……
想联系我也可以加QQ:2114496089,加好友都请备注来意。
Personal Blog
因为字体文件较大,win和mac没必要使用下载的字体,所以推荐使用no-fonts分支
包含字体文件:
git clone -b main –depth=1 http://git.pancake2021.work/pancake/GLT.git
或
http://git.pancake2021.work/pancake/GLT/archive/main.zip
不含字体文件:
git clone -b no-fonts –depth=1 http://git.pancake2021.work/pancake/GLT.git
或
http://git.pancake2021.work/pancake/GLT/archive/no-fonts.zip
因为在数学建模中用的很麻烦,写了个简易易使用版本。
以下代码为最终成品代码 demo 版本,不推荐使用,最新版本可点下载链接或可见 csdn。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
import torch import torch.nn as nn import numpy as np import matplotlib.pyplot as plt import torch.nn.functional as F import warnings import pandas as pd import scipy.stats as st # 设定随机种子,以确保实验的可重复性 torch.manual_seed(1) np.random.seed(1) # 定义LSTM模型 class LSTM(nn.Module): hidden = None def __init__(self, input_size, hidden_size, output_size, num_layers): super().__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.train_on_gpu = torch.cuda.is_available() # As batch_first=True, input: (batch_size, sequence_length, input_size) self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) self.lstm2 = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True) self.fc = nn.Linear(hidden_size, hidden_size) self.fcOut = nn.Linear(hidden_size, output_size) self.drop = nn.Dropout(0.5) # 可选操作,可以把下一行注释 self.apply(LSTM.init_weights) def forward(self, x): # 防止 loss.backward 报错 hidden = [each.data for each in self.hidden] x, hidden = self.lstm(x, hidden) x, self.hidden = self.lstm2(x, hidden) x = x[:, -1, :] x = F.relu(self.fc(x)) x = self.drop(x) x = self.fcOut(x) return x def init_hidden(self, batch_size): weight = next(self.parameters()).data if self.train_on_gpu: self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_().cuda(), weight.new(self.num_layers, batch_size, self.hidden_size).zero_().cuda()) else: self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_(), weight.new(self.num_layers, batch_size, self.hidden_size).zero_()) @staticmethod def init_weights(m): if type(m) == nn.LSTM: for name, param in m.named_parameters(): if 'weight_ih' in name: torch.nn.init.orthogonal_(param.data) elif 'weight_hh' in name: torch.nn.init.orthogonal_(param.data) elif 'bias' in name: param.data.fill_(0) elif type(m) == nn.Conv1d or type(m) == nn.Linear: torch.nn.init.orthogonal_(m.weight) m.bias.data.fill_(0) class CustomLSTM: X, Y, data, model, optimizer, criterion, result, result_, window, losses = [None] * 10 def __init__(self, data: np.array, window): self.data = data self.input_dim = self.init_data() self.output_dim = self.input_dim self.slice(window) # data 可为一维数组或二维数组,强制把一维数组转化为二维数组 def init_data(self): assert (length := len(self.data.shape)) in [1, 2] if length == 1: self.data = self.data[:, np.newaxis] return len(self.data[0]) # 检查总数据大小是否能整除 batch def check_batch(self, batch_size): length = self.X.shape[0] # 保证 batch_size 比总长度小 assert length >= batch_size if batch_size * (length // batch_size) != length: warnings.warn(f'数据大小为{length}, batch大小为{batch_size},无法整除,会损失{(length % batch_size) / length * 100}%数据', DeprecationWarning) # 以 window 为窗口大小切片形成整个batch def slice(self, window): self.window = window X, Y = [], [] for i in range(len(self.data) - window): X.append(self.data[i:i + window]) Y.append(self.data[i + window]) X = np.array(X) Y = np.array(Y) X = torch.from_numpy(X).float() # (batch_size, sequence_length, input_size) Y = torch.from_numpy(Y).float() print(f"数据格式:X = {X.shape}, Y = {Y.shape}") self.X = X self.Y = Y def re_slice(self, window): self.X, self.Y, self.model, self.optimizer, self.criterion, self.result, self.result_, self.losses = [None] * 9 self.slice(window) # 初始化 LSTM model def init_lstm(self, hidden=64, lr=0.001, num_layers=1): self.model = LSTM(self.input_dim, hidden, self.output_dim, num_layers) self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr) self.criterion = nn.MSELoss() if torch.cuda.is_available(): self.model.cuda() self.X.cuda() self.Y.cuda() # 总数据产生 batch 并可以进行 shuffle @staticmethod def iterate_batches(inputs, targets, batchsize, shuffle=True): assert len(inputs) == len(targets) if shuffle: indices = np.arange(len(inputs)) np.random.shuffle(indices) for start_idx in range(0, len(inputs) - batchsize + 1, batchsize): if shuffle: excerpt = indices[start_idx:start_idx + batchsize] else: excerpt = slice(start_idx, start_idx + batchsize) yield inputs[excerpt], targets[excerpt] # 开始训练 def train(self, num_epochs=100, batch_size=128, max_batch=False): losses = [] if self.model is None: raise ValueError("请先使用CustomLSTM.init_lstm初始化网络") if max_batch: batch_size = self.X.shape[0] else: self.check_batch(batch_size) for epoch in range(num_epochs): loss_all = 0 self.model.init_hidden(batch_size) for index, (batch_x, batch_y) in enumerate(CustomLSTM.iterate_batches(self.X, self.Y, batch_size, shuffle=True)): self.optimizer.zero_grad() outputs = self.model(batch_x) loss = self.criterion(outputs, batch_y) loss.backward() self.optimizer.step() loss_all += loss.detach().cpu() losses.append(loss_all / (index + 1)) if epoch % 20 == 0: print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs, loss_all / (index + 1))) self.losses = losses self.predicted() def plot_loss(self): if self.losses is None: raise ValueError("loss 不存在,请先进行训练") plt.figure(figsize=(12, 6)) plt.plot(self.losses) plt.xlabel("Epoch") plt.ylabel("Loss") plt.show() # 预测之后 n 天的值 def predict(self, n): # self.predicted() self.model.init_hidden(1) data = self.data[-self.window:, :].tolist() y = [] for i in range(n): x = torch.tensor(data).float().unsqueeze(0) result = self.model.cpu()(x).tolist() y.append(result[0]) data.append(result[0]) data.pop(0) self.result = np.array(y) return self.result # 预测从 window_size 到结束数据的预测值,以便与真实值做比较 def predicted(self): self.model.eval() self.model.init_hidden(len(self.X)) with torch.no_grad(): predicted = self.model(self.X) predicted = predicted.detach().cpu().numpy() self.result_ = predicted return self.result_ # 因为可以为二维数组即对多个变量进行预测,names即为每个变量的名字 def plot(self, names=None): if self.result is None: raise ValueError("请先使用CustomLSTM.predict") if names is None: names = [None] * len(self.data[0]) # further = self.predict(n) x = np.arange(len(self.data)) x_further = np.arange(len(self.data), len(self.data) + len(self.result)) plt.figure(figsize=(12, 6)) for i in range(len(self.data[0])): plt.plot(x, self.data, label=f'{names[0]} True Values') plt.plot(x[self.window:], self.result_[:, i], label=f'{names[0]} Predictions') plt.plot(x_further, self.result[:, i], label=f"{names[0]} Further Predictions") plt.show() # 画置信区间,demo 版本 # ToDo def plot_confidence(self, index=0, alpha=0.05): if self.result is None: raise ValueError("请先使用CustomLSTM.predict") plt.figure(figsize=(12, 6)) x = np.arange(len(self.data)) x_further = np.arange(len(self.data), len(self.data) + len(self.result)) y_true = self.data[self.window:, index].tolist() plt.plot(y_true, label='True Values') plt.plot(x_further, self.result[:, index], label="Further Predictions") plt.plot(x[self.window:], self.result_[:, index], label='Predictions') y_pred = self.result[:, index] lower, upper = Utils.ci(y_true, y_pred, alpha=alpha) plt.plot(y_pred, label='Predictions') plt.fill_between(np.arange(), lower, upper, alpha=0.2, label='Confidence interval') plt.legend() plt.show() # 打印 summary, 即 r^2 评价函数等 def summary(self): if self.model is None: raise ValueError("请先进行训练") print("==========Summary Begin===========") print("R2 =", score_r2 := Utils.r2(self.result_, self.data[self.window:, :])) print("MSE =", score_mse := Utils.mse(self.result_, self.data[self.window:, :])) print("RMSE =", score_rmse := np.sqrt(score_mse)) print("MAE =", score_mae := Utils.mae(self.result_, self.data[self.window:, :])) print("===========Summary end============") return score_r2, score_mse, score_rmse, score_mae # 工具 class Utils: # 置信区间,demo # ToDo @staticmethod def ci(y_true, y_pred, alpha=0.05): residuals = y_true - y_pred n = len(residuals) df = n - 1 t_value = st.norm.ppf(1 - alpha / 2, df) std_err = np.std(residuals, ddof=1) / np.sqrt(n) upper = residuals + t_value * std_err lower = residuals - t_value * std_err return lower, upper # r2 评价函数 @staticmethod def r2(y_pred, y_true): return 1 - ((y_pred - y_true) ** 2).sum(axis=0) / ((y_true.mean(axis=0) - y_true) ** 2).sum(axis=0) @staticmethod def mse(y_pred, y_true): return ((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred) @staticmethod def rmse(y_pred, y_true): return np.sqrt(((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred)) @staticmethod def mae(y_pred, y_true): return (np.absolute(y_true - y_pred)).sum(axis=0) / len(y_true) # 添加快捷打开文件操作 @staticmethod def openfile(name): file_type = name.split(".")[-1] if file_type == "csv": df = pd.read_csv(name, encoding='GBK') elif file_type == "xlsx" or file_type == "xls": df = pd.read_excel(name) else: raise TypeError(f"{name} 类型不是 csv, xls, xlsx") # df = df[["列名字1", "列名字2"]] print(df) return np.array(df) def load_data(): # 以下注释代码为快捷打开操作 # return Utils.openfile(file_location) data = np.sin(np.arange(100) * np.pi / 50) + np.random.randn(100) * 0.1 return data if __name__ == "__main__": # 加载数据 data = load_data() # 初始化网络 window_size = 10 batch = 100 lstm = CustomLSTM(data, window_size) lstm.init_lstm(hidden=64, lr=0.001, num_layers=1) # 训练网络 # max_batch 表示是否以整一个数据作为 batch 不做分割 lstm.train(num_epochs=1000, max_batch=True) # lstm.train(num_epochs=50, batch_size=30, max_batch=False) # 调整窗口大小重新训练 # lstm.re_slice(20) # lstm.init_lstm(hidden=64, lr=0.001, num_layers=1) # lstm.train(num_epochs=50, batch_size=40, max_batch=False) # 打印 summary r2, mse, rmse, mae = lstm.summary() # 预测之后 100 步数据 lstm.predict(100) # 画图 lstm.plot_loss() lstm.plot(['data']) # lstm.plot_confidence(index=0) |
主要讲解团队对2022小美赛C题的讲解,包含讲解与代码,赛题data。全部为python代码,基本不会提供代码注释及逻辑。
该题主要为时间序列分类,团队尝试的方法如下:
讲解顺序:python库→数据预处理:3\sigma与滤波→数据处理与导入(不含前一步的处理)→决策树等分类→TSFC→DCLSTM→GA决策树剪枝优化。总体结论:极限森林在赛题数据分类效果最好。
文章思路:数据处理→决策树分类→DCLSTM→算法对比→NSGA2对极限森林过拟合优化。
代码有前后关联性,后面代码没有的函数请到前面找。代码非常庞大,请耐心观看。
1 2 3 4 5 |
pip3 install numpy matplotlib pandas pip3 isntall sklearn pip3 install torch-对应gpu版本 pip3 install pyts pip3 install geatpy |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# math import math # plot import matplotlib.pyplot as plt # warning import warnings # system process import os # numpy import numpy as np # pandas import pandas as pd # data loader import pickle # csv import csv |
1 2 3 4 5 6 7 8 9 10 11 12 |
# cross score from sklearn.model_selection import cross_val_score # classifier for trees from sklearn.ensemble import RandomForestClassifier from sklearn.ensemble import ExtraTreesClassifier from sklearn.tree import DecisionTreeClassifier # train test split from sklearn.model_selection import train_test_split # nomolize from sklearn.preprocessing import normalize # scores from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, roc_auc_score, roc_curve, auc, confusion_matrix |
1 2 3 4 5 6 |
# draw tree from sklearn.tree import plot_tree # classifiers from pyts.classification import BOSSVS, TimeSeriesForest # multivarible from pyts.multivariate.classification import MultivariateClassifier |
1 2 3 |
import torch from torch import nn import torch.nn.functional as F |
1 2 |
import geatpy as ea from multiprocessing import Pool as ProcessPool |
1 2 3 4 |
def three_sigma(col): rule = (col.mean() - 3 * col.std() > col) | (col.mean() + 3 * col.std() < col) index = np.arange(col.shape[0])[rule] return index |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
""" https://www.bilibili.com/read/cv17553128 """ result = [] for action in range(19): filename = "./data/a{}/p1".format(str(action + 1).zfill(2)) now = None for i in range(60): f = np.loadtxt(os.path.join(filename, "s.txt".format(str(i + 1).zfill(2))), delimiter=",").T if now is None: now = f else: now = np.c_[now, f] data = now[:, :300] for i in [2]: AccX_Value = data[i] outs = three_sigma(AccX_Value) for out in outs: AccX_Value[out] = AccX_Value[out - 5: out + 5].mean() AccX_Value = AccX_Value[:, np.newaxis] Time = np.linspace(0, 5 * 2.4, int(5 * 2.4 * 25)) AccX_Variance = 0.01 # time step dt = 1 / 25 # transition_matrix F = [[1, dt, 0.5 * dt ** 2], [0, 1, dt], [0, 0, 1]] # observation_matrix H = [0, 0, 1] # transition_covariance Q = [[0.2, 0, 0], [0, 0.1, 0], [0, 0, 10e-4]] # observation_covariance R = AccX_Variance # initial_state_mean X0 = [0, 0, AccX_Value[0, 0]] # initial_state_covariance P0 = [[0, 0, 0], [0, 0, 0], [0, 0, AccX_Variance]] n_timesteps = AccX_Value.shape[0] n_dim_state = 3 filtered_state_means = np.zeros((n_timesteps, n_dim_state)) filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state)) kf = KalmanFilter(transition_matrices=F, observation_matrices=H, transition_covariance=Q, observation_covariance=R, initial_state_mean=X0, initial_state_covariance=P0) # iterative estimation for each new measurement for t in range(n_timesteps): if t == 0: filtered_state_means[t] = X0 filtered_state_covariances[t] = P0 else: filtered_state_means[t], filtered_state_covariances[t] = ( kf.filter_update( filtered_state_means[t - 1], filtered_state_covariances[t - 1], AccX_Value[t, 0] ) ) result.append(list(filtered_state_means[:, 2])) # plt.plot(Time, AccX_Value) # plt.plot(Time, filtered_state_means[:, 2], "r-") # plt.title('Acceleration X') # plt.ylim(7.5, 8.5) # plt.grid() # plt.legend() # plt.savefig("original.png") # plt.show() f = csv.writer(open("result filter.csv", "w", newline="")) f.writerows(result) |
1 2 3 4 5 6 7 8 9 10 |
# n is the window size def sliding_window_filter(data_filter, n): n -= 1 length = len(data_filter) assert 1 <= n <= length filtered_data = np.copy(data_filter).astype(np.float32) for i in range(n): filtered_data[n:] += data_filter[i: length - n + i] filtered_data[n:] *= 1 / (n + 1) return filtered_data |
其中:num为选择哪些传感器,n_class为选择哪些活动进行分类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# 标准data类,用于dataloder class _Data: def __init__(self, data, label, length=None): self.x = data self.y = label if length: self.len = length else: self.len = len(self.y) def __len__(self): return self.len def __getitem__(self, item): return self.x[item], self.y[item] # 提取特征 def process(data): arange = np.arange(0, len(data))[:, np.newaxis] varange = np.sum(arange ** 2) mean = data.mean(axis=0) std = np.std(data, axis=0) k = np.sum((data - mean) * arange / varange, axis=0) return np.r_[mean, std, k] # 以多少step(5s)提取数据 def load_data(step=1, process=None): num = [i for i in range(45)] dirname = "./data/" n_class = np.array(os.listdir(dirname)) now = [] label = [] for y, class_name in enumerate(n_class): print(f"now process {y + 1} ") dir_a = os.listdir(now_dir_name := os.path.join(dirname, class_name)) for person in range(len(dir_a)): dir_b = os.listdir(now_file_name := os.path.join(now_dir_name, dir_a[person])) for segment in range(0, (len(dir_b) + 1) // step * step - 1, step): temp = None for i in range(step): if temp is None: temp = np.loadtxt(os.path.join(now_file_name, dir_b[i + segment]), delimiter=",")[:, num] else: temp = np.r_[temp, np.loadtxt(os.path.join(now_file_name, dir_b[i + segment]), delimiter=",")[:, num]] temp = normalize(temp, axis=0) now.append(process(temp) if process is not None else temp) label.append(y) data = _Data(np.array(now), np.array(label)) return data # 切分数据把5s数据进行切分 def load_data_cut(cut=3, process=None): num = [i for i in range(45)] dirname = "./data/" n_class = os.listdir(dirname) now = [] label = [] for y, class_name in enumerate(n_class[:]): print(f"now process {y + 1} ") dir_a = os.listdir(now_dir_name := os.path.join(dirname, class_name)) for person in range(len(dir_a)): dir_b = os.listdir(now_file_name := os.path.join(now_dir_name, dir_a[person])) for segment in range(0, len(dir_b)): file = np.loadtxt(os.path.join(now_file_name, dir_b[segment]), delimiter=",")[:, num] number = len(file) // cut for i in range(cut): file_cut = normalize(file[number * i: number * (i + 1), :], axis=0) now.append(process(file_cut) if process is not None else file_cut) label.append(y) data = _Data(np.array(now), np.array(label)) return data # shuffle def shuffleData(X, y, seed=None): import random random.seed(seed) index = [i for i in range(len(X))] random.shuffle(index) X = X[index] y = y[index] return X, y |
1 2 3 4 5 6 7 8 9 10 11 |
# 标准方式如下(5s) data = load_data(step:int, process) # 数据切分的更小 data = load_data_cut(cut:int, process) # 保存数据 f = open("data.data", 'wb') pickle.dump(data, f) # 加载数据 f = open("data.data", 'rb') data = pickle.load(f) data = _Data(*shuffleData(data.x, data.y, 1)) |
说明:代码包含k-fold cross-validation,其中注释代码可用作别的用途,具体你要画什么图就用什么代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
f = open("data.data", 'rb') data = pickle.load(f) data = _Data(*shuffleData(data.x, data.y, 1)) result = [] K = 10 size = (n := len(data)) // K # K-fold cross validation for i in range(10): now_silence = [i for i in range(i * size)] + [i for i in range((i + 1) * size, n)] now_silence_test = [i for i in range(i * size, (i + 1) * size)] X_train, X_test, y_train, y_test = data.x[now_silence], data.x[now_silence_test], data.y[now_silence], data.y[now_silence_test] clf3 = ExtraTreesClassifier(max_depth=10, max_leaf_nodes=100, min_samples_split=2, random_state=0) clf3.fit(X_train, y_train) out = clf3.predict(X_test) result.append([f1_score(y_test, out, average='weighted'), precision_score(y_test, out, average='weighted'), recall_score(y_test, out, average='weighted'), accuracy_score(y_test, out)]) # clf1 = DecisionTreeClassifier(max_depth=None, min_samples_split=2, random_state=0) # clf2 = RandomForestClassifier(n_estimators=10, max_depth=None, min_samples_split=2, bootstrap=True) # clf1.fit(x_train, y_train) # clf2.fit(x_train, y_train) # print(clf1.feature_importances_) # print(clf2.feature_importances_) # print(clf3.feature_importances_) # result.append([clf1.score(x_test, y_test), clf2.score(x_test, y_test), clf3.score(x_test, y_test)]) # confusion_matrix(y_test, clf3.predict(y_test)) # scores1 = cross_val_score(clf1, x_train, y_train) # scores2 = cross_val_score(clf2, x_train, y_train) # scores3 = cross_val_score(clf3, x_train, y_train) # print('DecisionTreeClassifier交叉验证准确率为:' + str(scores1.mean())) # print('RandomForestClassifier交叉验证准确率为:' + str(scores2.mean())) # print('ExtraTreesClassifier交叉验证准确率为:' + str(scores3.mean())) print(result) print(np.array(result).mean(axis=0)) f = open("result tree k fold.txt", 'w') f.writelines(str(result)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
dirname = "./data/" # 具体让那几个东西进行分类 n_class = os.listdir(dirname) # n_class = [n_class[0], n_class[17]] # 具体使用那些传感器 num = [i for i in range(45)] now = [] label = [] for y, class_name in enumerate(n_class): print(f"now process {y + 1} ") for person, person_name in enumerate(os.listdir(now_dir_name := os.path.join(dirname, class_name))[:]): for segment, segment_name in enumerate(os.listdir(now_file_name := os.path.join(now_dir_name, person_name))[:]): now.append(np.loadtxt(os.path.join(now_file_name, segment_name), delimiter=",")[:, num].T) label.append(y) data = _Data(np.array(now), np.array(label)) print("data process finished, start TSFC") X_train, X_test, y_train, y_test = train_test_split(data.x, data.y, test_size=0.1) clf = MultivariateClassifier(TimeSeriesForest(n_jobs=6)) # print(clf.estimators_[0]._pipeline['rfc'].estimators_) clf.fit(X_train, y_train) print(clf.score(X_test, y_test)) with open("clf.data", 'wb') as f: pickle.dump(clf, f) f.close() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
with open("clf.data", 'rb') as f: clf = pickle.load(f) print(clf) f.close() print(len(clf.estimators_[0].feature_importances_)) # now = [] # label = [] # for y, class_name in enumerate(n_class): # print(f"now process {y + 1} ") # for person, person_name in enumerate(os.listdir(now_dir_name := os.path.join(dirname, class_name))[:]): # for segment, segment_name in enumerate(os.listdir(now_file_name := os.path.join(now_dir_name, person_name))[:]): # now.append(np.loadtxt(os.path.join(now_file_name, segment_name), delimiter=",")[:, num].T) # label.append(y) # data = _Data(np.array(now), np.array(label)) # print(clf.score(data.x, data.y)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
""" https://github.com/dspanah/Sensor-Based-Human-Activity-Recognition-DeepConvLSTM-Pytorch """ def load_dataset(filename, num, step=None): dirname = filename now = [] label = [] for y, class_name in enumerate(os.listdir(dirname)[:]): print(f"now process {y + 1} ") dir_a = os.listdir(now_dir_name := os.path.join(dirname, class_name)) for person in range(len(dir_a)): dir_b = os.listdir(now_file_name := os.path.join(now_dir_name, dir_a[person])) for segment in range(0, (len(dir_b) + 1) // step * step - 1, step): temp = None for i in range(step): if temp is None: temp = np.loadtxt(os.path.join(now_file_name, dir_b[i + segment]), delimiter=",")[:, num].T else: temp = np.r_[temp, np.loadtxt(os.path.join(now_file_name, dir_b[i + segment]), delimiter=",")[:, num].T] now.append(temp) label.append(y) x = torch.nn.functional.normalize(torch.tensor(torch.from_numpy(np.array(now)), dtype=torch.float32), dim=-1) # x = torch.tensor(torch.from_numpy(np.array(now)), dtype=torch.float32) data = _Data(x, torch.from_numpy(np.array(label)).long()) return data # data1 无滤波归一化 data2滤波无归一化 data3滤波归一化 datafile = "./data.data" if os.path.exists(datafile): with open(datafile, 'rb') as f: data = pickle.load(f) f.close() else: print("Loading data...") num = [i for i in range(45)] data = load_dataset('./data/', num, 1) print("Done") with open(datafile, 'wb') as f: pickle.dump(data, f) f.close() X_train, X_test, y_train, y_test = train_test_split(data.x, data.y, test_size=0.3, random_state=1) # for i in range(len(X_test)): # for j in range(-6, 0): # X_test[i][j][:] = torch.tensor([0]*125) print(X_train.shape) _, SLIDING_WINDOW_LENGTH, NB_SENSOR_CHANNELS = X_train.shape class HARModel(nn.Module): def __init__(self, n_hidden=128, n_layers=1, n_filters=100, n_classes=19, filter_size=1, drop_prob=0.5): super(HARModel, self).__init__() self.drop_prob = drop_prob self.n_layers = n_layers self.n_hidden = n_hidden self.n_filters = n_filters self.n_classes = n_classes self.filter_size = (filter_size,) self.conv1 = nn.Conv1d(NB_SENSOR_CHANNELS, n_filters, self.filter_size) self.conv2 = nn.Conv1d(n_filters, n_filters, self.filter_size) self.conv3 = nn.Conv1d(n_filters, n_filters, self.filter_size) self.conv4 = nn.Conv1d(n_filters, n_filters, self.filter_size) self.lstm1 = nn.LSTM(n_filters, n_hidden, n_layers) self.lstm2 = nn.LSTM(n_hidden, n_hidden, n_layers) self.fc = nn.Linear(n_hidden, n_classes) self.dropout = nn.Dropout(drop_prob) def forward(self, x, hidden, batch_size): x = x.view(-1, NB_SENSOR_CHANNELS, SLIDING_WINDOW_LENGTH) x = F.relu(self.conv1(x)) x = F.relu(self.conv2(x)) x = F.relu(self.conv3(x)) x = F.relu(self.conv4(x)) x = x.view(-1, batch_size, self.n_filters) # x = x.view(SLIDING_WINDOW_LENGTH, -1, NB_SENSOR_CHANNELS) x, hidden = self.lstm1(x, hidden) x, hidden = self.lstm2(x, hidden) x = x.contiguous().view(-1, self.n_hidden) x = self.dropout(x) x = self.fc(x) out = x.view(batch_size, -1, self.n_classes)[:, -1, :] return out, hidden def init_hidden(self, batch_size): ''' Initializes hidden state ''' # Create two new tensors with sizes n_layers x batch_size x n_hidden, # initialized to zero, for hidden state and cell state of LSTM weight = next(self.parameters()).data if (train_on_gpu): hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(), weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda()) else: hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(), weight.new(self.n_layers, batch_size, self.n_hidden).zero_()) return hidden net = HARModel() def init_weights(m): if type(m) == nn.LSTM: for name, param in m.named_parameters(): if 'weight_ih' in name: torch.nn.init.orthogonal_(param.data) elif 'weight_hh' in name: torch.nn.init.orthogonal_(param.data) elif 'bias' in name: param.data.fill_(0) elif type(m) == nn.Conv1d or type(m) == nn.Linear: torch.nn.init.orthogonal_(m.weight) m.bias.data.fill_(0) net.apply(init_weights) def iterate_minibatches(inputs, targets, batchsize, shuffle=True): assert len(inputs) == len(targets) if shuffle: indices = np.arange(len(inputs)) np.random.shuffle(indices) for start_idx in range(0, len(inputs) - batchsize + 1, batchsize): if shuffle: excerpt = indices[start_idx:start_idx + batchsize] else: excerpt = slice(start_idx, start_idx + batchsize) yield inputs[excerpt], targets[excerpt] ## check if GPU is available train_on_gpu = torch.cuda.is_available() if (train_on_gpu): print('Training on GPU!') else: print('No GPU available, training on CPU; consider making n_epochs very small.') def train(net, epochs=1000, batch_size=400, lr=0.001): opt = torch.optim.Adam(net.parameters(), lr=lr) criterion = nn.CrossEntropyLoss() if (train_on_gpu): net.cuda() for e in range(epochs): # initialize hidden state h = net.init_hidden(batch_size) train_losses = [] net.train() for batch in iterate_minibatches(X_train, y_train, batch_size): inputs, targets = batch if (train_on_gpu): inputs, targets = inputs.cuda(), targets.cuda() # Creating new variables for the hidden state, otherwise # we'd backprop through the entire training history h = tuple([each.data for each in h]) # zero accumulated gradients opt.zero_grad() # get the output from the model output, h = net(inputs, h, batch_size) loss = criterion(output, targets.long()) train_losses.append(loss.item()) loss.backward() opt.step() val_h = net.init_hidden(batch_size) val_losses = [] accuracy = 0 f1score = 0 net.eval() with torch.no_grad(): for batch in iterate_minibatches(X_test, y_test, batch_size): inputs, targets = batch val_h = tuple([each.data for each in val_h]) if (train_on_gpu): inputs, targets = inputs.cuda(), targets.cuda() output, val_h = net(inputs, val_h, batch_size) # print(confusion_matrix(y_train, output)) val_loss = criterion(output, targets.long()) val_losses.append(val_loss.item()) top_p, top_class = output.topk(1, dim=1) equals = top_class == targets.view(*top_class.shape).long() accuracy += torch.mean(equals.type(torch.FloatTensor)) f1score += metrics.f1_score(top_class.cpu(), targets.view(*top_class.shape).long().cpu(), average='weighted') net.train() # reset to train mode after iterationg through validation data print("Epoch: {}/{}...".format(e + 1, epochs), "Train Loss: {:.4f}...".format(np.mean(train_losses)), "Val Loss: {:.4f}...".format(np.mean(val_losses)), "Val Acc: {:.4f}...".format(accuracy / (len(X_test) // batch_size)), "F1-Score: {:.4f}...".format(f1score / (len(X_test) // batch_size))) train(net) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# 自定义 GA class My_nsga(ea.Problem): def __init__(self, pools=10): # max_depth = 18-30 min_samples_leaf = 10-20 max_features=85-135 max_leaf_nodes = 50-200 ccp_alpha = 0.0001-0.002 max_depth = [18, 30] min_samples_leaf = [10, 20] max_feature = [85, 135] max_leaf_node = [50, 200] ccp_alpha = [0.0001, 0.002] name = 'Tree Classifier' M = 2 maxormins = [-1, 1] Dim = 5 varTypes = [1] * 4 + [0] lb, ub = list(zip(*[max_depth, min_samples_leaf, max_feature, max_leaf_node, ccp_alpha])) lb = list(lb) ub = list(ub) lbin = [1] * Dim ubin = [1] * Dim self.ans = {} self.max_score = -float("inf") self.epoch = 0 self.pool = ProcessPool(pools) ea.Problem.__init__(self, name, M, maxormins, Dim, varTypes, lb, ub, lbin, ubin) # 目标函数即神经网络返回值 # 多线程 # @cal_time() def evalVars(self, Vars): global data args = [] for i in range(len(Vars)): varibal = list(Vars[i]) args.append(varibal + [data.x, data.y]) result = self.pool.starmap_async(get_ans, args) result.wait() ans = result.get() ans = np.array(ans) print(f"Epoch: {self.epoch}, Epoch Max: {ans.max(axis=0)}") self.epoch += 1 return ans def get_ans(max_depth, min_samples_leaf, max_feature, max_leaf_node, ccp_alpha, x, y): score = [] max_depth, min_samples_leaf, max_feature, max_leaf_node = map(int, (max_depth, min_samples_leaf, max_feature, max_leaf_node)) for i in range(10): x_train, x_test, y_train, y_test = train_test_split(x, y, train_size=0.7) clf = ExtraTreesClassifier(max_depth=max_depth, min_samples_leaf=min_samples_leaf, max_features=max_feature, max_leaf_nodes=max_leaf_node, ccp_alpha=ccp_alpha, n_estimators=10, min_samples_split=2, bootstrap=False) clf.fit(x_train, y_train) score1 = float(clf.score(x_test, y_test)) score2 = float(clf.score(x_train, y_train)) score.append([score1, abs(score1 - score2)]) return np.array(score).mean(axis=0) # 运行 GA # @cal_time() def Run_nsga(ndind=10, maxgen=100, pools=10): problem = My_nsga(pools) encoding = "RI" # 查看染色体编码 # field = ea.crtfld(encoding, problem.varTypes, problem.ranges, problem.borders) # print(ea.crtpc(encoding, ndind, field)) myAlgorithm = ea.moea_NSGA2_templet(problem, ea.Population(Encoding=encoding, NIND=ndind), MAXGEN=maxgen, logTras=1, drawing=1) res = ea.optimize(myAlgorithm, seed=1, verbose=True, drawing=1, outputMsg=1, drawLog=1, saveFlag=1, dirName='result') print(res) if __name__ == "__main__": with open(r"./Datas/data_tree_{}".format(0), 'rb') as f: data = pickle.load(f) f.close() Run_nsga(30, 100, 14) |
https://xiezuocat.com/#/quantum
真的超好用,现在还免费
为了应对更加复杂的问题,不同的编码方式的问题,对此进行代码讲解,个人觉得这个遗传算法库做的很好,上手比较快,适用性很强,强烈推荐,用的很顺手。
示例说明:
a = [0~30连续数] * _num
b = [1 ~ num num个数排列排序] 即数列 1~num 不重复排列
c = [0~30离散数(整数)] * _num
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
import geatpy as ea import numpy as np # 示例说明: # a = [0~30连续数] * _num # b = [1 ~ num num个数排列排序] 即数列 1~num 不重复排列 # c = [0~30离散数(整数)] * _num def func(a, b, c): return np.sin(np.pi * a * b) * np.cos(np.pi * c * b) * np.tan(np.pi * a * b * c) # 初始化文件num大小 _num = 10 # 自定义 GA class My_nsga(ea.Problem): def __init__(self): global _num name = 'GEATPY Facilitate Learning' # 只优化func一个变量 M = 1 # 最大化变量 maxormins = [-1] * M # 一共有 a.size + b.size + c.size 个数 Dim = _num + _num + _num # a 连续, b and c 离散 varTypes = [0] * _num + [1] * _num + [1] * _num # a ∈ [0,30], b ∈ [1,_num], c ∈ [0,30] lb = [0] * _num + [1] * _num + [0] * _num ub = [30] * _num + [_num] * _num + [30] * _num # 下限可取 lbin = [1] * Dim # 上限可取 ubin = [1] * Dim # 保存最大数据 self.max_ans = 0 # epoch self.epoch = 0 # super ea.Problem.__init__(self, name, M, maxormins, Dim, varTypes, lb, ub, lbin, ubin) # 目标函数即神经网络返回值 def evalVars(self, Vars): # init ans ans = np.zeros(len(Vars), dtype=float).reshape(len(Vars), 1) for i in range(len(Vars)): # 一定要确定区域选择是否正确 a = Vars[i][:_num] # 可以选择int类型防止特定错误,因为正常输出时为float,具体问题可能会出错 b = list(map(int, Vars[i][_num: _num + _num])) c = Vars[i][-_num:] score = func(a, b, c) ans[i] = score.max() self.max_ans = (now_max := ans.max()) print(f"Epoch: {self.epoch + 1}, Epoch Max: {now_max}, Global Max: {self.max_ans}") self.epoch += 1 return ans # 运行 GA def Run_nsga(ndind=10, maxgen=100, *args, **kwargs): global _num # init problem problem = My_nsga() # 染色体编码方式为[格雷编码,排列编码,格雷编码] encodings = ["RI", "P", "RI"] # 设置具体field,一定要多加这一部,geatpy还没有具体到能直接输入encoding: List[str],需要进行转换才能使用 # 一定要注意数组范围,报错的话一定要仔细检查!!! field1 = ea.crtfld(encodings[0], problem.varTypes[:_num], problem.ranges[:, :_num], problem.borders[:, :_num]) field2 = ea.crtfld(encodings[1], problem.varTypes[_num: _num + _num], problem.ranges[:, _num: _num + _num], problem.borders[:, _num: _num + _num]) field3 = ea.crtfld(encodings[2], problem.varTypes[-_num:], problem.ranges[:, -_num:], problem.borders[:, -_num:]) # 合并field fields = [field1, field2, field3] # 代入模板 myAlgorithm = ea.soea_psy_EGA_templet(problem, ea.PsyPopulation(Encodings=encodings, Fields=fields, NIND=ndind), MAXGEN=maxgen, logTras=0) # 是否画图 myAlgorithm.drawing = 0 # 进行优化 res = ea.optimize(myAlgorithm, seed=1, verbose=False, drawing=0, outputMsg=True, drawLog=False, saveFlag=False, dirName='result') return res['Vars'][0] if __name__ == "__main__": Run_nsga(50, 1000) |
浙ICP备2021019730-1 浙公网安备 33010902002953号
Copyright © 2024 PanCake