"""
https://github.com/dspanah/Sensor-Based-Human-Activity-Recognition-DeepConvLSTM-Pytorch
"""
def load_dataset(filename, num, step=None):
dirname = filename
now = []
label = []
for y, class_name in enumerate(os.listdir(dirname)[:]):
print(f"now process {y + 1} ")
dir_a = os.listdir(now_dir_name := os.path.join(dirname, class_name))
for person in range(len(dir_a)):
dir_b = os.listdir(now_file_name := os.path.join(now_dir_name, dir_a[person]))
for segment in range(0, (len(dir_b) + 1) // step * step - 1, step):
temp = None
for i in range(step):
if temp is None:
temp = np.loadtxt(os.path.join(now_file_name, dir_b[i + segment]), delimiter=",")[:, num].T
else:
temp = np.r_[temp, np.loadtxt(os.path.join(now_file_name, dir_b[i + segment]), delimiter=",")[:, num].T]
now.append(temp)
label.append(y)
x = torch.nn.functional.normalize(torch.tensor(torch.from_numpy(np.array(now)), dtype=torch.float32), dim=-1)
# x = torch.tensor(torch.from_numpy(np.array(now)), dtype=torch.float32)
data = _Data(x, torch.from_numpy(np.array(label)).long())
return data
# data1 无滤波归一化 data2滤波无归一化 data3滤波归一化
datafile = "./data.data"
if os.path.exists(datafile):
with open(datafile, 'rb') as f:
data = pickle.load(f)
f.close()
else:
print("Loading data...")
num = [i for i in range(45)]
data = load_dataset('./data/', num, 1)
print("Done")
with open(datafile, 'wb') as f:
pickle.dump(data, f)
f.close()
X_train, X_test, y_train, y_test = train_test_split(data.x, data.y, test_size=0.3, random_state=1)
# for i in range(len(X_test)):
# for j in range(-6, 0):
# X_test[i][j][:] = torch.tensor([0]*125)
print(X_train.shape)
_, SLIDING_WINDOW_LENGTH, NB_SENSOR_CHANNELS = X_train.shape
class HARModel(nn.Module):
def __init__(self, n_hidden=128, n_layers=1, n_filters=100,
n_classes=19, filter_size=1, drop_prob=0.5):
super(HARModel, self).__init__()
self.drop_prob = drop_prob
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_filters = n_filters
self.n_classes = n_classes
self.filter_size = (filter_size,)
self.conv1 = nn.Conv1d(NB_SENSOR_CHANNELS, n_filters, self.filter_size)
self.conv2 = nn.Conv1d(n_filters, n_filters, self.filter_size)
self.conv3 = nn.Conv1d(n_filters, n_filters, self.filter_size)
self.conv4 = nn.Conv1d(n_filters, n_filters, self.filter_size)
self.lstm1 = nn.LSTM(n_filters, n_hidden, n_layers)
self.lstm2 = nn.LSTM(n_hidden, n_hidden, n_layers)
self.fc = nn.Linear(n_hidden, n_classes)
self.dropout = nn.Dropout(drop_prob)
def forward(self, x, hidden, batch_size):
x = x.view(-1, NB_SENSOR_CHANNELS, SLIDING_WINDOW_LENGTH)
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x))
x = F.relu(self.conv4(x))
x = x.view(-1, batch_size, self.n_filters)
# x = x.view(SLIDING_WINDOW_LENGTH, -1, NB_SENSOR_CHANNELS)
x, hidden = self.lstm1(x, hidden)
x, hidden = self.lstm2(x, hidden)
x = x.contiguous().view(-1, self.n_hidden)
x = self.dropout(x)
x = self.fc(x)
out = x.view(batch_size, -1, self.n_classes)[:, -1, :]
return out, hidden
def init_hidden(self, batch_size):
''' Initializes hidden state '''
# Create two new tensors with sizes n_layers x batch_size x n_hidden,
# initialized to zero, for hidden state and cell state of LSTM
weight = next(self.parameters()).data
if (train_on_gpu):
hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
else:
hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
return hidden
net = HARModel()
def init_weights(m):
if type(m) == nn.LSTM:
for name, param in m.named_parameters():
if 'weight_ih' in name:
torch.nn.init.orthogonal_(param.data)
elif 'weight_hh' in name:
torch.nn.init.orthogonal_(param.data)
elif 'bias' in name:
param.data.fill_(0)
elif type(m) == nn.Conv1d or type(m) == nn.Linear:
torch.nn.init.orthogonal_(m.weight)
m.bias.data.fill_(0)
net.apply(init_weights)
def iterate_minibatches(inputs, targets, batchsize, shuffle=True):
assert len(inputs) == len(targets)
if shuffle:
indices = np.arange(len(inputs))
np.random.shuffle(indices)
for start_idx in range(0, len(inputs) - batchsize + 1, batchsize):
if shuffle:
excerpt = indices[start_idx:start_idx + batchsize]
else:
excerpt = slice(start_idx, start_idx + batchsize)
yield inputs[excerpt], targets[excerpt]
## check if GPU is available
train_on_gpu = torch.cuda.is_available()
if (train_on_gpu):
print('Training on GPU!')
else:
print('No GPU available, training on CPU; consider making n_epochs very small.')
def train(net, epochs=1000, batch_size=400, lr=0.001):
opt = torch.optim.Adam(net.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
if (train_on_gpu):
net.cuda()
for e in range(epochs):
# initialize hidden state
h = net.init_hidden(batch_size)
train_losses = []
net.train()
for batch in iterate_minibatches(X_train, y_train, batch_size):
inputs, targets = batch
if (train_on_gpu):
inputs, targets = inputs.cuda(), targets.cuda()
# Creating new variables for the hidden state, otherwise
# we'd backprop through the entire training history
h = tuple([each.data for each in h])
# zero accumulated gradients
opt.zero_grad()
# get the output from the model
output, h = net(inputs, h, batch_size)
loss = criterion(output, targets.long())
train_losses.append(loss.item())
loss.backward()
opt.step()
val_h = net.init_hidden(batch_size)
val_losses = []
accuracy = 0
f1score = 0
net.eval()
with torch.no_grad():
for batch in iterate_minibatches(X_test, y_test, batch_size):
inputs, targets = batch
val_h = tuple([each.data for each in val_h])
if (train_on_gpu):
inputs, targets = inputs.cuda(), targets.cuda()
output, val_h = net(inputs, val_h, batch_size)
# print(confusion_matrix(y_train, output))
val_loss = criterion(output, targets.long())
val_losses.append(val_loss.item())
top_p, top_class = output.topk(1, dim=1)
equals = top_class == targets.view(*top_class.shape).long()
accuracy += torch.mean(equals.type(torch.FloatTensor))
f1score += metrics.f1_score(top_class.cpu(), targets.view(*top_class.shape).long().cpu(), average='weighted')
net.train() # reset to train mode after iterationg through validation data
print("Epoch: {}/{}...".format(e + 1, epochs),
"Train Loss: {:.4f}...".format(np.mean(train_losses)),
"Val Loss: {:.4f}...".format(np.mean(val_losses)),
"Val Acc: {:.4f}...".format(accuracy / (len(X_test) // batch_size)),
"F1-Score: {:.4f}...".format(f1score / (len(X_test) // batch_size)))
train(net)