Genesis
Esse commit está contido em:
@@ -0,0 +1,9 @@
|
||||
# Statistical Comparison of Valence-Arousal Classifiers from EEG on DEAP and MANHOB Datasets
|
||||
|
||||
This repository contains pretrained models, code and a report about a statistical study on different valence-arousal classifiers from EEG (Electroencephalography) signals on [DEAP](https://www.eecs.qmul.ac.uk/mmv/datasets/deap/) and [MAHNOB](https://mahnob-db.eu/hci-tagging/) datasets.
|
||||
|
||||
The [report](./report.pdf) compares two classifiers (a deep neural network and a convolutional neural network) against DEAP and MAHNOB, and evaluates if there is a significant difference in performance between the two methods using some statistical tests (like McNemar's test and 5x2cv test).
|
||||
|
||||
## Project structure
|
||||
|
||||
Project structure is explained in Section 7 of the report.
|
||||
Arquivo binário não exibido.
Arquivo binário não exibido.
Arquivo binário não exibido.
Arquivo binário não exibido.
Arquivo binário não exibido.
@@ -0,0 +1,97 @@
|
||||
import os, pickle, math
|
||||
import numpy as np
|
||||
import reduce_dim
|
||||
|
||||
src_dir = '/Users/riccardo/Documents/datasets/DEAP/data_preprocessed_python/'
|
||||
dst_dir = '/Users/riccardo/Documents/datasets/DEAP/deap_preprocessed_standardized_global/'
|
||||
|
||||
n_experiments = 40 # experiment per participant
|
||||
n_channels = 40 # channel per experiment
|
||||
n_recordings = 8064 # recordings per channel
|
||||
|
||||
participants = os.listdir(src_dir)
|
||||
participants.sort()
|
||||
assert len(participants) == 32 # DEAP has 32 participants
|
||||
|
||||
tot_experiments = 40 * 32
|
||||
exported_experiments = 0
|
||||
|
||||
# Global mean and std calculation
|
||||
# total_sum = 0.0
|
||||
# for participant in participants:
|
||||
# participant_data_path = os.path.join(src_dir, participant)
|
||||
# with open(participant_data_path, mode='r+b') as f:
|
||||
# data = pickle.load(f, encoding='latin1')
|
||||
# data, labels = data['data'], data['labels']
|
||||
|
||||
# assert data[:,0:32,:].shape == (n_experiments, 32, n_recordings)
|
||||
|
||||
# total_sum += np.sum(data[:,0:32,:])
|
||||
|
||||
# global_mean = total_sum / (len(participants) * n_experiments * 32 * n_recordings)
|
||||
|
||||
# sum_of_squared_error = 0.0
|
||||
# for participant in participants:
|
||||
# participant_data_path = os.path.join(src_dir, participant)
|
||||
# with open(participant_data_path, mode='r+b') as f:
|
||||
# data = pickle.load(f, encoding='latin1')
|
||||
# data, labels = data['data'], data['labels']
|
||||
|
||||
# assert data[:,0:32,:].shape == (n_experiments, 32, n_recordings)
|
||||
|
||||
# sum_of_squared_error += np.sum(np.power((data[:,0:32,:] - global_mean), np.array([2])))
|
||||
|
||||
# global_std = np.sqrt(sum_of_squared_error / (len(participants) * n_experiments * 32 * n_recordings))
|
||||
|
||||
# print(f'Global mean = {global_mean}, global std = {global_std}')
|
||||
|
||||
# Preprocess data
|
||||
for i, participant in enumerate(participants):
|
||||
print(f'Participant {i+1}/{len(participants)} ({participant})')
|
||||
|
||||
participant_data_path = os.path.join(src_dir, participant)
|
||||
|
||||
with open(participant_data_path, mode='r+b') as f:
|
||||
# encoding needed because DEAP data was pickled with Python2
|
||||
data = pickle.load(f, encoding='latin1')
|
||||
|
||||
data, labels = data['data'], data['labels']
|
||||
assert data.shape == (n_experiments, n_channels, n_recordings)
|
||||
assert labels.shape == (n_experiments, 4)
|
||||
|
||||
for j in range(n_experiments):
|
||||
# Removes non-EEG channels from data
|
||||
data_tmp = data[j,0:32,:]
|
||||
assert data_tmp.shape == (32, n_recordings)
|
||||
|
||||
# Standardize data (globally, and before dimensionality reduction)
|
||||
# data_tmp = (data_tmp - global_mean) / global_std
|
||||
|
||||
data_tmp = reduce_dim.reduce_dim(data_tmp)
|
||||
assert data_tmp.shape == (32, 99)
|
||||
|
||||
# Standardize data (indipendently for each channel, after dim reduction)
|
||||
# for m in range(32):
|
||||
# data_tmp[m] = (data_tmp[m] - np.mean(data_tmp[m])) / np.std(data_tmp[m])
|
||||
|
||||
# Standardize data (for all channels, after dim reduction)
|
||||
data_tmp = (data_tmp - np.mean(data_tmp)) / np.std(data_tmp)
|
||||
|
||||
# Removes all annotations except for valence and arousal
|
||||
label_tmp = labels[j,0:2]
|
||||
assert label_tmp.shape == (2,)
|
||||
|
||||
dat = {
|
||||
'data': data_tmp,
|
||||
'labels': label_tmp,
|
||||
}
|
||||
|
||||
dat_file_path = os.path.join(dst_dir, f'{i+1}_{j+1}.dat')
|
||||
with open(dat_file_path, mode='w+b') as dat_file:
|
||||
pickle.dump(dat, dat_file)
|
||||
|
||||
print(f'{dat_file_path} exported successfully.')
|
||||
exported_experiments += 1
|
||||
|
||||
print('Done.')
|
||||
print(f'Exported {exported_experiments} experiments out of {tot_experiments}.')
|
||||
@@ -0,0 +1,116 @@
|
||||
import mne
|
||||
import os
|
||||
import xml.etree.ElementTree as ET
|
||||
import numpy as np
|
||||
import pickle
|
||||
import reduce_dim
|
||||
|
||||
eeg_channels = [
|
||||
'Fp1', 'AF3', 'F3', 'F7', 'FC5', 'FC1', 'C3', 'T7',
|
||||
'CP5', 'CP1', 'P3', 'P7', 'PO3', 'O1', 'Oz', 'Pz',
|
||||
'Fp2', 'AF4', 'Fz', 'F4', 'F8', 'FC6', 'FC2', 'Cz',
|
||||
'C4', 'T8', 'CP6', 'CP2', 'P4', 'P8', 'PO4', 'O2'
|
||||
]
|
||||
assert len(eeg_channels) == 32
|
||||
|
||||
# dir_path must be a directory containing one subdir for each experiment (session)
|
||||
# each subdirectory must contain both a .bdf file (eeg recordings)
|
||||
# and a .xml file (labels)
|
||||
src_path = '/Users/riccardo/Documents/datasets/MAHNOB/Sessions/'
|
||||
|
||||
# dst_path is the directory which will contain n .dat files, one for each experiment (session)
|
||||
# the folder must already exist
|
||||
dst_path = '/Users/riccardo/Documents/datasets/MAHNOB/mahnob_preprocessed_standardized2_referencedavg_128hz_bandpass/'
|
||||
|
||||
n_sessions_exported = 0
|
||||
sessions = os.listdir(src_path)
|
||||
|
||||
for i, session in enumerate(sessions):
|
||||
print(f'Session {i+1}/{len(sessions)} ({session})')
|
||||
|
||||
session_path = os.path.join(src_path, session)
|
||||
ls = os.listdir(session_path)
|
||||
|
||||
if len(ls) != 2:
|
||||
print(f'Wrong number of files in {session}: expected 2, found {len(ls)}. Skipping this session.')
|
||||
continue
|
||||
|
||||
bdf_file = ls[0] if '.bdf' in ls[0] else ls[1]
|
||||
xml_file = ls[0] if '.xml' in ls[0] else ls[1]
|
||||
|
||||
# Read the .xml file
|
||||
xml = ET.parse(os.path.join(session_path, xml_file))
|
||||
root = xml.getroot()
|
||||
attributes = root.attrib
|
||||
|
||||
if not 'feltVlnc' in attributes or not 'feltArsl' in attributes or not 'sessionId' in attributes:
|
||||
print('No annotations for valence and/or arousal. Skipping this session.')
|
||||
continue
|
||||
|
||||
session_id = int(attributes['sessionId'])
|
||||
|
||||
valence = float(attributes['feltVlnc'])
|
||||
arousal = float(attributes['feltArsl'])
|
||||
#emotion = int(attributes['feltEmo'])
|
||||
#subject = root.getiterator('subject')[0].attrib['id']
|
||||
|
||||
labels = np.array([valence, arousal])
|
||||
|
||||
# Read and preprocess the .bdf file
|
||||
bdf = mne.io.read_raw_bdf(os.path.join(session_path, bdf_file), verbose=False, preload=True)
|
||||
|
||||
# Set EEG reference
|
||||
# mne.set_eeg_reference(bdf, ref_channels=['Cz'], copy=False, verbose=False)
|
||||
mne.set_eeg_reference(bdf, ref_channels='average', copy=False, verbose=False)
|
||||
|
||||
# Apply a 4-45Hz bandpass filter
|
||||
# bdf = bdf.filter(4.0, 45.0, picks=eeg_channels, verbose=False)
|
||||
|
||||
# Picks only the 32 EEG channels
|
||||
data = bdf.get_data(picks=eeg_channels)
|
||||
|
||||
# DEAP has 8064 recordings for each experiment
|
||||
# MAHNOB usualy has more (~19'000) and in variable number,
|
||||
# and also has 30 seconds of measurements
|
||||
# before and after each experiment
|
||||
# So, the middle 8064 recordings are extracted
|
||||
n_target_samples = 8064 * 2
|
||||
n_samples = data.shape[1]
|
||||
|
||||
start_sample = 0
|
||||
end_sample = n_samples - 1
|
||||
|
||||
if n_samples > n_target_samples:
|
||||
start_sample = (n_samples // 2) - (n_target_samples // 2)
|
||||
end_sample = start_sample + n_target_samples
|
||||
elif n_samples < n_target_samples:
|
||||
assert False
|
||||
|
||||
data = data[:,start_sample:end_sample]
|
||||
assert data.shape == (len(eeg_channels), n_target_samples)
|
||||
|
||||
data = reduce_dim.reduce_dim(data)
|
||||
assert data.shape == (len(eeg_channels), 99)
|
||||
|
||||
# Standardize data (indipendently for each channel, after dim reduction)
|
||||
# for m in range(32):
|
||||
# data[m] = (data[m] - np.mean(data[m])) / np.std(data[m])
|
||||
|
||||
# Standardize data (globally, after dim reduction)
|
||||
data = (data - np.mean(data)) / np.std(data)
|
||||
|
||||
# Pack both data and label into a pickled .dat file
|
||||
data = {
|
||||
'data': data,
|
||||
'labels': labels
|
||||
}
|
||||
|
||||
dat_file_path = os.path.join(dst_path, f'{session_id}.dat')
|
||||
dat_file = open(dat_file_path, mode='w+b')
|
||||
pickle.dump(data, dat_file)
|
||||
|
||||
n_sessions_exported += 1
|
||||
print(f'{dat_file_path} exported successfully.')
|
||||
|
||||
print('Done.')
|
||||
print(f'{n_sessions_exported} sessions out of {len(sessions)} exported.')
|
||||
@@ -0,0 +1,63 @@
|
||||
import numpy as np
|
||||
import scipy.stats as sp
|
||||
|
||||
# Input: data array with shape (32, 8064) for DEAP or (32, 8064*2) for MAHNOB
|
||||
# (i.e. 32 eeg-channels with 8064 recordings each)
|
||||
# Output: data array with shape (32, 99)
|
||||
# (the 8064 recordings get chunked to 99 statistical values)
|
||||
def reduce_dim(data):
|
||||
assert (data.shape == (32, 8064) or data.shape == (32, 8064*2))
|
||||
|
||||
processed_data = np.zeros((32, 99))
|
||||
|
||||
for channel_n in range(32):
|
||||
# Divide the 8064 recordings in 10 batches of 807 (10th batch: 801)
|
||||
if data.shape == (32, 8064):
|
||||
batch_size = 807
|
||||
n_samples = 8064
|
||||
elif data.shape == (32, 8064*2):
|
||||
batch_size = 807 * 2
|
||||
n_samples = 8064 * 2
|
||||
|
||||
batch_n = 0
|
||||
|
||||
for batch_n in range(10):
|
||||
if batch_n != 9:
|
||||
batch = data[channel_n,(batch_n*batch_size):(batch_n*batch_size+batch_size)]
|
||||
else:
|
||||
batch = data[channel_n,(batch_n*batch_size):n_samples]
|
||||
|
||||
processed_data[channel_n,(batch_n * 9):(batch_n * 9 + 9)] = np.array([
|
||||
np.mean(batch),
|
||||
np.median(batch),
|
||||
np.max(batch),
|
||||
np.min(batch),
|
||||
np.std(batch),
|
||||
np.var(batch),
|
||||
np.max(batch) - np.min(batch),
|
||||
sp.skew(batch),
|
||||
sp.kurtosis(batch),
|
||||
])
|
||||
|
||||
processed_data[channel_n,90:99] = np.array([
|
||||
np.mean(data[channel_n,:]),
|
||||
np.median(data[channel_n,:]),
|
||||
np.max(data[channel_n,:]),
|
||||
np.min(data[channel_n,:]),
|
||||
np.std(data[channel_n,:]),
|
||||
np.var(data[channel_n,:]),
|
||||
np.max(data[channel_n,:]) - np.min(data[channel_n,:]),
|
||||
sp.skew(data[channel_n,:]),
|
||||
sp.kurtosis(data[channel_n,:]),
|
||||
])
|
||||
|
||||
assert processed_data.shape == (32, 99)
|
||||
return processed_data
|
||||
|
||||
|
||||
# data = np.random.rand(32, 8064) * 100
|
||||
# print(data[2,:])
|
||||
# processed_data = reduce_dim(data)
|
||||
|
||||
# print('=' * 40)
|
||||
# print(processed_data[2,:])
|
||||
@@ -0,0 +1,22 @@
|
||||
DATASET:
|
||||
dataset_to_use: 'deap'
|
||||
deap_dataset_path: '/Users/riccardo/Documents/datasets/DEAP/deap_preprocessed_standardized2/'
|
||||
|
||||
MODEL:
|
||||
model: 'cnn'
|
||||
dropout_probs: [0.25, 0.15, 0.5, 0.25]
|
||||
|
||||
TRAIN:
|
||||
seed: 12
|
||||
|
||||
classification_of: 'arousal'
|
||||
|
||||
train_test_split: [1180, 100]
|
||||
|
||||
batch_size: 50
|
||||
num_epochs: 250
|
||||
lr: 0.001
|
||||
momentum: 0.9
|
||||
|
||||
EXPORT:
|
||||
model_path: '/Users/riccardo/uni/ac/project/report/models'
|
||||
@@ -0,0 +1,22 @@
|
||||
DATASET:
|
||||
dataset_to_use: 'deap'
|
||||
deap_dataset_path: '/Users/riccardo/Documents/datasets/DEAP/deap_preprocessed_standardized2/'
|
||||
|
||||
MODEL:
|
||||
model: 'cnn'
|
||||
dropout_probs: [0.25, 0.15, 0.5, 0.25]
|
||||
|
||||
TRAIN:
|
||||
seed: 12
|
||||
|
||||
classification_of: 'valence'
|
||||
|
||||
train_test_split: [1180, 100]
|
||||
|
||||
batch_size: 50
|
||||
num_epochs: 250
|
||||
lr: 0.001
|
||||
momentum: 0.9
|
||||
|
||||
EXPORT:
|
||||
model_path: '/Users/riccardo/uni/ac/project/report/models'
|
||||
@@ -0,0 +1,23 @@
|
||||
DATASET:
|
||||
dataset_to_use: 'deap'
|
||||
deap_dataset_path: '/Users/riccardo/Documents/datasets/DEAP/deap_preprocessed_standardized2/'
|
||||
|
||||
MODEL:
|
||||
model: 'dnn'
|
||||
sizes: [5000, 500, 1000]
|
||||
dropout_probs: [0.25, 0.5]
|
||||
|
||||
TRAIN:
|
||||
seed: 12
|
||||
|
||||
classification_of: 'arousal'
|
||||
|
||||
train_test_split: [1180, 100]
|
||||
|
||||
batch_size: 310
|
||||
num_epochs: 250
|
||||
lr: 0.0001
|
||||
momentum: 0.0
|
||||
|
||||
EXPORT:
|
||||
model_path: '/Users/riccardo/uni/ac/project/report/models'
|
||||
@@ -0,0 +1,23 @@
|
||||
DATASET:
|
||||
dataset_to_use: 'deap'
|
||||
deap_dataset_path: '/Users/riccardo/Documents/datasets/DEAP/deap_preprocessed_standardized2/'
|
||||
|
||||
MODEL:
|
||||
model: 'dnn'
|
||||
sizes: [5000, 500, 1000]
|
||||
dropout_probs: [0.25, 0.5]
|
||||
|
||||
TRAIN:
|
||||
seed: 12
|
||||
|
||||
classification_of: 'valence'
|
||||
|
||||
train_test_split: [1180, 100]
|
||||
|
||||
batch_size: 310
|
||||
num_epochs: 250
|
||||
lr: 0.0001
|
||||
momentum: 0.0
|
||||
|
||||
EXPORT:
|
||||
model_path: '/Users/riccardo/uni/ac/project/report/models'
|
||||
@@ -0,0 +1,22 @@
|
||||
DATASET:
|
||||
dataset_to_use: 'mahnob'
|
||||
mahnob_dataset_path: '/Users/riccardo/Documents/datasets/MAHNOB/mahnob_preprocessed_standardized2_referenced_128hz/'
|
||||
|
||||
MODEL:
|
||||
model: 'cnn'
|
||||
dropout_probs: [0.1, 0.05, 0.25, 0.25]
|
||||
|
||||
TRAIN:
|
||||
seed: 12
|
||||
|
||||
classification_of: 'arousal'
|
||||
|
||||
train_test_split: [460, 86]
|
||||
|
||||
batch_size: 40
|
||||
num_epochs: 150
|
||||
lr: 0.001
|
||||
momentum: 0.9
|
||||
|
||||
EXPORT:
|
||||
model_path: '/Users/riccardo/uni/ac/project/report/models'
|
||||
@@ -0,0 +1,22 @@
|
||||
DATASET:
|
||||
dataset_to_use: 'mahnob'
|
||||
mahnob_dataset_path: '/Users/riccardo/Documents/datasets/MAHNOB/mahnob_preprocessed_standardized2_referenced_128hz/'
|
||||
|
||||
MODEL:
|
||||
model: 'cnn'
|
||||
dropout_probs: [0.1, 0.05, 0.25, 0.25]
|
||||
|
||||
TRAIN:
|
||||
seed: 12
|
||||
|
||||
classification_of: 'valence'
|
||||
|
||||
train_test_split: [460, 86]
|
||||
|
||||
batch_size: 40
|
||||
num_epochs: 150
|
||||
lr: 0.001
|
||||
momentum: 0.9
|
||||
|
||||
EXPORT:
|
||||
model_path: '/Users/riccardo/uni/ac/project/report/models'
|
||||
@@ -0,0 +1,23 @@
|
||||
DATASET:
|
||||
dataset_to_use: 'mahnob'
|
||||
mahnob_dataset_path: '/Users/riccardo/Documents/datasets/MAHNOB/mahnob_preprocessed_standardized2_referenced_128hz/'
|
||||
|
||||
MODEL:
|
||||
model: 'dnn'
|
||||
sizes: [5000, 500, 1000]
|
||||
dropout_probs: [0.25, 0.5]
|
||||
|
||||
TRAIN:
|
||||
seed: 12
|
||||
|
||||
classification_of: 'arousal'
|
||||
|
||||
train_test_split: [460, 86]
|
||||
|
||||
batch_size: 40
|
||||
num_epochs: 150
|
||||
lr: 0.001
|
||||
momentum: 0.0
|
||||
|
||||
EXPORT:
|
||||
model_path: '/Users/riccardo/uni/ac/project/report/models'
|
||||
@@ -0,0 +1,23 @@
|
||||
DATASET:
|
||||
dataset_to_use: 'mahnob'
|
||||
mahnob_dataset_path: '/Users/riccardo/Documents/datasets/MAHNOB/mahnob_preprocessed_standardized2_referenced_128hz/'
|
||||
|
||||
MODEL:
|
||||
model: 'dnn'
|
||||
sizes: [5000, 500, 1000]
|
||||
dropout_probs: [0.25, 0.5]
|
||||
|
||||
TRAIN:
|
||||
seed: 12
|
||||
|
||||
classification_of: 'valence'
|
||||
|
||||
train_test_split: [460, 86]
|
||||
|
||||
batch_size: 40
|
||||
num_epochs: 150
|
||||
lr: 0.001
|
||||
momentum: 0.0
|
||||
|
||||
EXPORT:
|
||||
model_path: '/Users/riccardo/uni/ac/project/report/models'
|
||||
@@ -0,0 +1,48 @@
|
||||
import os, pickle, torch
|
||||
from torch.utils.data import Dataset
|
||||
|
||||
n_channels = 32
|
||||
n_recordings = 99
|
||||
|
||||
# High val = 708, low val = 572
|
||||
# High ar = 737, low ar = 543
|
||||
class DEAP(Dataset):
|
||||
def __init__(self, dataset_path):
|
||||
self.dataset_path = dataset_path
|
||||
|
||||
self.sessions = os.listdir(dataset_path)
|
||||
|
||||
# remove .DS_Store if present
|
||||
if '.DS_Store' in self.sessions:
|
||||
self.sessions.remove('.DS_Store')
|
||||
|
||||
def __len__(self):
|
||||
return len(self.sessions)
|
||||
|
||||
def __getitem__(self, index):
|
||||
file_path = os.path.join(self.dataset_path, self.sessions[index])
|
||||
with open(file_path, mode='rb') as file:
|
||||
session = pickle.load(file)
|
||||
|
||||
data, labels = session['data'], session['labels']
|
||||
data, labels = torch.from_numpy(data), torch.from_numpy(labels)
|
||||
data, labels = data.float(), labels.float()
|
||||
|
||||
# 1 = high value, 0 = low value
|
||||
labels = (labels >= 5.0).long()
|
||||
|
||||
assert data.shape == (n_channels, n_recordings)
|
||||
assert labels.shape == (2,)
|
||||
|
||||
return data, labels
|
||||
|
||||
|
||||
class MAHNOB(DEAP):
|
||||
def __init__(self, dataset_path):
|
||||
super().__init__(dataset_path)
|
||||
|
||||
def __len__(self):
|
||||
return super().__len__()
|
||||
|
||||
def __getitem__(self, index):
|
||||
return super().__getitem__(index)
|
||||
@@ -0,0 +1,88 @@
|
||||
from torch import nn
|
||||
|
||||
n_channels = 32
|
||||
n_recordings = 99
|
||||
|
||||
class DNN(nn.Module):
|
||||
def __init__(self, sizes=(5000, 500, 1000), dropout_probs=(0.25, 0.5)):
|
||||
super(DNN, self).__init__()
|
||||
|
||||
self.linear1 = nn.Linear(n_channels * n_recordings, sizes[0])
|
||||
self.linear2 = nn.Linear(sizes[0], sizes[1])
|
||||
self.linear3 = nn.Linear(sizes[1], sizes[2])
|
||||
self.linear4 = nn.Linear(sizes[2], 1) # binary classification: high vs low
|
||||
|
||||
self.dropout1 = nn.Dropout(dropout_probs[0])
|
||||
self.dropout2 = nn.Dropout(dropout_probs[1])
|
||||
|
||||
self.relu = nn.ReLU()
|
||||
|
||||
self.flatten = nn.Flatten(start_dim=1)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.flatten(x)
|
||||
|
||||
x = self.dropout1(x)
|
||||
x = self.linear1(x)
|
||||
x = self.relu(x)
|
||||
|
||||
x = self.dropout2(x)
|
||||
x = self.linear2(x)
|
||||
x = self.relu(x)
|
||||
|
||||
x = self.dropout2(x)
|
||||
x = self.linear3(x)
|
||||
x = self.relu(x)
|
||||
|
||||
x = self.dropout2(x)
|
||||
x = self.linear4(x)
|
||||
|
||||
return x
|
||||
|
||||
|
||||
class CNN(nn.Module):
|
||||
def __init__(self, dropout_probs=(0.25, 0.15, 0.5, 0.25)):
|
||||
super(CNN, self).__init__()
|
||||
|
||||
self.conv1 = nn.Conv2d(1, 20, (3, 3), padding=(1, 1))
|
||||
self.conv2 = nn.Conv2d(20, 40, (3, 3), padding=(1, 1))
|
||||
|
||||
self.maxpool = nn.MaxPool2d((2, 2))
|
||||
|
||||
self.linear1 = nn.Linear(40 * 16 * 49, 128)
|
||||
self.linear2 = nn.Linear(128, 1)
|
||||
|
||||
self.tanh = nn.Tanh()
|
||||
self.relu = nn.ReLU()
|
||||
|
||||
self.dropout0 = nn.Dropout(dropout_probs[0])
|
||||
self.dropout1 = nn.Dropout2d(dropout_probs[1])
|
||||
self.dropout2 = nn.Dropout(dropout_probs[2])
|
||||
self.dropout3 = nn.Dropout(dropout_probs[3])
|
||||
|
||||
self.flatten = nn.Flatten(start_dim=1)
|
||||
|
||||
self.softplus = nn.Softplus()
|
||||
|
||||
def forward(self, x):
|
||||
x = x[:,None,:,:] # add dummy dim for channel
|
||||
|
||||
x = self.dropout0(x)
|
||||
|
||||
x = self.relu(self.conv1(x))
|
||||
x = self.dropout1(x)
|
||||
|
||||
x = self.relu(self.conv2(x))
|
||||
x = self.dropout1(x)
|
||||
x = self.maxpool(x)
|
||||
|
||||
x = self.flatten(x)
|
||||
|
||||
x = self.dropout2(x)
|
||||
|
||||
x = self.relu(self.linear1(x))
|
||||
x = self.dropout3(x)
|
||||
|
||||
x = self.linear2(x)
|
||||
|
||||
return x
|
||||
@@ -0,0 +1,103 @@
|
||||
# Imports
|
||||
import torch, yaml, os
|
||||
from torch.utils.data import DataLoader, random_split
|
||||
|
||||
from datasets import DEAP, MAHNOB
|
||||
from models import DNN, CNN
|
||||
from utils import check_train_test_split_balanced
|
||||
from train_utils import train
|
||||
|
||||
# Read configs
|
||||
with open('./scripts/nn/configs/deap_dnn_arousal.yml') as yaml_file:
|
||||
config = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
|
||||
# Hyperparams
|
||||
num_epochs = config['TRAIN']['num_epochs']
|
||||
batch_size = config['TRAIN']['batch_size']
|
||||
lr = config['TRAIN']['lr']
|
||||
momentum = config['TRAIN']['momentum']
|
||||
|
||||
# Model
|
||||
model_type = config['MODEL']['model']
|
||||
|
||||
# Train
|
||||
classification_of = config['TRAIN']['classification_of']
|
||||
|
||||
# Dataset
|
||||
dataset_to_use = config['DATASET']['dataset_to_use']
|
||||
|
||||
# Export
|
||||
model_path = config['EXPORT']['model_path']
|
||||
model_name = f'{dataset_to_use}-{model_type}-{classification_of}'
|
||||
|
||||
if dataset_to_use == 'deap':
|
||||
dataset_path = config['DATASET']['deap_dataset_path']
|
||||
dataset = DEAP(dataset_path)
|
||||
elif dataset_to_use == 'mahnob':
|
||||
dataset_path = config['DATASET']['mahnob_dataset_path']
|
||||
dataset = MAHNOB(dataset_path)
|
||||
else:
|
||||
assert False
|
||||
|
||||
# check_dataset_balanced(dataset_path)
|
||||
|
||||
seed = config['TRAIN']['seed']
|
||||
train_set_size, test_set_size = config['TRAIN']['train_test_split']
|
||||
train_set, test_set = random_split(
|
||||
dataset,
|
||||
[train_set_size, test_set_size],
|
||||
generator=torch.Generator().manual_seed(seed)
|
||||
)
|
||||
|
||||
print(f'{len(dataset)} examples found ({train_set_size} train, {test_set_size} test)')
|
||||
|
||||
train_loader = DataLoader(
|
||||
train_set,
|
||||
batch_size=batch_size,
|
||||
shuffle=True
|
||||
)
|
||||
|
||||
test_loader = DataLoader(
|
||||
test_set,
|
||||
batch_size=batch_size,
|
||||
shuffle=True
|
||||
)
|
||||
|
||||
check_train_test_split_balanced(train_loader, test_loader)
|
||||
|
||||
# Model
|
||||
if model_type == 'dnn':
|
||||
model = DNN(
|
||||
sizes=tuple(config['MODEL']['sizes']),
|
||||
dropout_probs=tuple(config['MODEL']['dropout_probs'])
|
||||
)
|
||||
optimizer = torch.optim.RMSprop(model.parameters(), lr=lr)
|
||||
elif model_type == 'cnn':
|
||||
model = CNN(dropout_probs=tuple(config['MODEL']['dropout_probs']))
|
||||
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum)
|
||||
else:
|
||||
assert False
|
||||
|
||||
def init_weights(m):
|
||||
if isinstance(m, torch.nn.Linear):
|
||||
torch.nn.init.xavier_normal_(m.weight)
|
||||
m.bias.data.fill_(0.0)
|
||||
|
||||
model.apply(init_weights)
|
||||
|
||||
# Launch training
|
||||
best_acc_test_set = train(
|
||||
model,
|
||||
train_loader,
|
||||
torch.nn.BCEWithLogitsLoss(),
|
||||
optimizer,
|
||||
classification_of=classification_of,
|
||||
num_epochs=num_epochs,
|
||||
do_check_accuracy=True,
|
||||
check_accuracy_every=1,
|
||||
test_loader=test_loader,
|
||||
model_path=model_path,
|
||||
model_name=model_name,
|
||||
)
|
||||
|
||||
os.system(f'say Best test accuracy {(best_acc_test_set*100):.2f}%')
|
||||
@@ -0,0 +1,90 @@
|
||||
import torch
|
||||
import numpy as np
|
||||
from utils import save_model
|
||||
|
||||
def check_accuracy(model, data_loader, classification_of='valence', num_examples=-1):
|
||||
num_corrects = 0
|
||||
|
||||
if num_examples == -1:
|
||||
num_examples = len(data_loader.dataset)
|
||||
|
||||
with torch.no_grad():
|
||||
model.eval()
|
||||
|
||||
for data, labels in data_loader:
|
||||
preds = model(data)
|
||||
preds = torch.squeeze((preds >= 0.0).long())
|
||||
labels = labels[:,0] if classification_of == 'valence' else labels[:,1]
|
||||
|
||||
assert preds.shape == labels.shape
|
||||
|
||||
num_corrects += torch.sum((preds == labels).long())
|
||||
|
||||
model.train()
|
||||
|
||||
return (num_corrects / num_examples).item()
|
||||
|
||||
def train(
|
||||
model,
|
||||
train_loader,
|
||||
criterion,
|
||||
optimizer,
|
||||
classification_of='valence',
|
||||
num_epochs=100,
|
||||
do_check_accuracy=True,
|
||||
test_loader=None,
|
||||
model_path=None,
|
||||
model_name=None,
|
||||
check_accuracy_every=50
|
||||
):
|
||||
model.train()
|
||||
|
||||
best_acc_test_set = 0.0
|
||||
|
||||
avg_loss_per_epoch = []
|
||||
accuracy_per_epoch = []
|
||||
|
||||
for epoch_n in range(1, num_epochs+1):
|
||||
epoch_losses = []
|
||||
|
||||
for batch_i, (data, labels) in enumerate(train_loader, start=1):
|
||||
preds = torch.squeeze(model(data))
|
||||
labels = labels[:,0].float() if classification_of == 'valence' else labels[:,1].float()
|
||||
|
||||
loss = criterion(preds, labels)
|
||||
|
||||
if len(data) == train_loader.batch_size:
|
||||
epoch_losses.append(loss.item())
|
||||
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
print(f'\rEPOCH {epoch_n}/{num_epochs}: batch {batch_i}: {loss:.3f}', end='')
|
||||
|
||||
avg_epoch_loss = np.mean(epoch_losses)
|
||||
avg_loss_per_epoch.append(avg_epoch_loss)
|
||||
print(f' (Avg epoch loss = {avg_epoch_loss:.3f})', end='')
|
||||
|
||||
if do_check_accuracy and epoch_n % check_accuracy_every == 0:
|
||||
if test_loader == None or model_path == None or model_name == None:
|
||||
assert False
|
||||
|
||||
print('\nChecking accuracy on training set... ', end=' ')
|
||||
acc_train_set = check_accuracy(model, train_loader, classification_of=classification_of)
|
||||
print(f'{(acc_train_set*100):.2f}%')
|
||||
|
||||
print('Testing accuracy on test set...', end=' ')
|
||||
acc_test_set = check_accuracy(model, test_loader, classification_of=classification_of)
|
||||
print(f'{(acc_test_set * 100):.2f}%')
|
||||
accuracy_per_epoch.append(acc_test_set)
|
||||
|
||||
if acc_test_set > best_acc_test_set:
|
||||
save_model(model, model_path=model_path, model_name=model_name)
|
||||
best_acc_test_set = acc_test_set
|
||||
|
||||
print('\n')
|
||||
|
||||
if best_acc_test_set != 0.0:
|
||||
print(f'Best accuracy on test set: {(best_acc_test_set*100):.2f}%')
|
||||
return best_acc_test_set
|
||||
@@ -0,0 +1,84 @@
|
||||
import datetime, math, os, torch, pickle
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
def get_current_timestamp():
|
||||
return datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
|
||||
def get_train_test_sizes(train_set_perc, dataset_size):
|
||||
train_set_size = math.floor(dataset_size / 100 * train_set_perc)
|
||||
test_set_size = dataset_size - train_set_size
|
||||
|
||||
return train_set_size, test_set_size
|
||||
|
||||
def plot_loss_and_accuracy_per_epoch(loss_per_epoch, accuracy_per_epoch, model_path):
|
||||
fig = plt.figure()
|
||||
fig.add_subplot(1, 2, 1)
|
||||
plt.title('Average loss per epoch')
|
||||
plt.xlabel('epoch')
|
||||
plt.ylabel('avg loss')
|
||||
plt.plot(range(1, len(loss_per_epoch) + 1), loss_per_epoch)
|
||||
|
||||
fig.add_subplot(1, 2, 2)
|
||||
plt.title('Accuracy per epoch')
|
||||
plt.xlabel('epoch')
|
||||
plt.ylabel('accuracy')
|
||||
plt.plot(range(1, len(accuracy_per_epoch) + 1), accuracy_per_epoch)
|
||||
|
||||
plt.savefig(os.path.join(model_path, f'{get_current_timestamp()}-accuracy-plot.png'))
|
||||
# plt.show()
|
||||
|
||||
def save_model(model, model_path='/', model_name='model'):
|
||||
model_final_path = f'{os.path.join(model_path, model_name)}.pt'
|
||||
torch.save(model.state_dict(), model_final_path)
|
||||
|
||||
print(f'Model saved: {model_final_path}')
|
||||
|
||||
def check_dataset_balanced(dataset_path):
|
||||
high_valence_count = 0
|
||||
low_valence_count = 0
|
||||
high_arousal_count = 0
|
||||
low_arousal_count = 0
|
||||
|
||||
ls = os.listdir(dataset_path)
|
||||
|
||||
if '.DS_Store' in ls:
|
||||
ls.remove('.DS_Store')
|
||||
|
||||
for filepath in ls:
|
||||
with open(os.path.join(dataset_path, filepath), mode='rb') as f:
|
||||
data = pickle.load(f)
|
||||
_, labels = data['data'], data['labels']
|
||||
|
||||
if labels[0] > 5.0:
|
||||
high_valence_count += 1
|
||||
else:
|
||||
low_valence_count += 1
|
||||
|
||||
if labels[1] > 5.0:
|
||||
high_arousal_count += 1
|
||||
else:
|
||||
low_arousal_count += 1
|
||||
|
||||
print(f'Total examples = {len(ls)}')
|
||||
print(f'High valence = {high_valence_count} ({(high_valence_count/len(ls)*100):.2f}%),', end=' ')
|
||||
print(f'Low valence = {low_valence_count} ({(low_valence_count/len(ls)*100):.2f}%)')
|
||||
print(f'High arousal = {high_arousal_count} ({(high_arousal_count/len(ls)*100):.2f}%),', end=' ')
|
||||
print(f'Low arousal = {low_arousal_count} ({(low_arousal_count/len(ls)*100):.2f}%)')
|
||||
|
||||
def check_train_test_split_balanced(train_loader, test_loader):
|
||||
train_set = train_loader.dataset
|
||||
test_set = test_loader.dataset
|
||||
|
||||
for loader in [train_loader, test_loader]:
|
||||
num_high = 0
|
||||
for data, labels in loader:
|
||||
labels = labels[:,0]
|
||||
num_high += torch.sum(labels)
|
||||
|
||||
if loader == train_loader:
|
||||
print(f'Train set: L // H = {len(train_set)-num_high} // {num_high} = {((len(train_set)-num_high)/len(train_set)*100):.2f}% // {(num_high/len(train_set)*100):.2f}%')
|
||||
else:
|
||||
print(f'Test set: L // H = {len(test_set)-num_high} // {num_high} = {((len(test_set)-num_high)/len(test_set)*100):.2f} // {(num_high/len(test_set)*100):.2f}%')
|
||||
|
||||
def count_model_parameters(model):
|
||||
return sum(p.numel() for p in model.parameters() if p.requires_grad)
|
||||
@@ -0,0 +1,152 @@
|
||||
import torch, os, sys, yaml
|
||||
from torch.utils.data import DataLoader, SubsetRandomSampler
|
||||
import numpy as np
|
||||
from sklearn.model_selection import KFold
|
||||
|
||||
sys.path.insert(1, os.path.abspath('./scripts/nn/'))
|
||||
|
||||
from datasets import DEAP, MAHNOB
|
||||
from models import DNN, CNN
|
||||
from train_utils import train, check_accuracy
|
||||
from utils import get_current_timestamp
|
||||
|
||||
# Config file
|
||||
with open('./scripts/nn/configs/deap_dnn_valence.yml') as yaml_file:
|
||||
config = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
|
||||
# Hyperparams
|
||||
num_epochs = 250
|
||||
batch_size = config['TRAIN']['batch_size']
|
||||
lr = config['TRAIN']['lr']
|
||||
momentum = config['TRAIN']['momentum']
|
||||
|
||||
# Model
|
||||
model_type = config['MODEL']['model']
|
||||
|
||||
# Train
|
||||
classification_of = config['TRAIN']['classification_of']
|
||||
|
||||
# Dataset
|
||||
dataset_to_use = config['DATASET']['dataset_to_use']
|
||||
|
||||
if dataset_to_use == 'deap':
|
||||
dataset_path = config['DATASET']['deap_dataset_path']
|
||||
dataset = DEAP(dataset_path)
|
||||
elif dataset_to_use == 'mahnob':
|
||||
dataset_path = config['DATASET']['mahnob_dataset_path']
|
||||
dataset = MAHNOB(dataset_path)
|
||||
else:
|
||||
assert False
|
||||
|
||||
# cross validation
|
||||
def cross_validation(kfold):
|
||||
accuracies = []
|
||||
|
||||
for fold, (train_idx, test_idx) in enumerate(kfold.split(np.arange(len(dataset)))):
|
||||
print('=' * 50)
|
||||
print(f'Fold #{fold+1}')
|
||||
|
||||
# Build data loader from k-fold indices
|
||||
train_sampler = SubsetRandomSampler(train_idx)
|
||||
test_sampler = SubsetRandomSampler(test_idx)
|
||||
|
||||
train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
|
||||
test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)
|
||||
|
||||
# Build models
|
||||
dnn = DNN(
|
||||
sizes=(5000, 500, 1000),
|
||||
dropout_probs=(0.25, 0.5)
|
||||
)
|
||||
dnn_optimizer = torch.optim.RMSprop(dnn.parameters(), lr=lr)
|
||||
|
||||
cnn = CNN(dropout_probs=(0.25, 0.15, 0.5, 0.25))
|
||||
cnn_optimizer = torch.optim.SGD(cnn.parameters(), lr=lr, momentum=momentum)
|
||||
|
||||
models_optimizers = [(dnn, dnn_optimizer), (cnn, cnn_optimizer)]
|
||||
|
||||
for model_i, (model, optimizer) in enumerate(models_optimizers):
|
||||
print(f'Model {model_i+1}')
|
||||
|
||||
# Train on training fold
|
||||
train(
|
||||
model,
|
||||
train_loader,
|
||||
torch.nn.BCEWithLogitsLoss(),
|
||||
optimizer,
|
||||
classification_of='valence',
|
||||
num_epochs=num_epochs,
|
||||
do_check_accuracy=False,
|
||||
)
|
||||
|
||||
# Check accuracy of model on test fold
|
||||
num_examples = len(test_loader.sampler)
|
||||
accuracy = check_accuracy(
|
||||
model,
|
||||
test_loader,
|
||||
num_examples=num_examples,
|
||||
classification_of='valence'
|
||||
)
|
||||
|
||||
accuracies.append(accuracy)
|
||||
|
||||
print(f'Test accuracy of model {model_i+1} on fold {fold+1} = {(accuracy*100):.2f}%')
|
||||
|
||||
return accuracies
|
||||
|
||||
|
||||
# 5x2cv
|
||||
p_vars = []
|
||||
p1_first_iteration = None
|
||||
|
||||
for i in range(5):
|
||||
print('/' * 50)
|
||||
print(f'CV ITERATION #{i+1}')
|
||||
print('/' * 50)
|
||||
|
||||
kfold = KFold(n_splits=2, shuffle=True)
|
||||
|
||||
accuracies = cross_validation(kfold)
|
||||
|
||||
assert len(accuracies) == 4
|
||||
|
||||
p1_dnn = accuracies[0]
|
||||
p1_cnn = accuracies[1]
|
||||
p2_dnn = accuracies[2]
|
||||
p2_cnn = accuracies[3]
|
||||
|
||||
p1 = p1_dnn - p1_cnn
|
||||
p2 = p2_dnn - p2_cnn
|
||||
|
||||
p_avg = (p1 + p2) / 2
|
||||
p_var = (p1 - p_avg)**2 + (p2 - p_avg)**2
|
||||
|
||||
p_vars.append(p_var)
|
||||
|
||||
# first iteration only
|
||||
if i == 0:
|
||||
p1_first_iteration = p1
|
||||
|
||||
# Compute the 5x2cv test statistic
|
||||
t = p1_first_iteration / np.sqrt(0.2 * np.sum(p_vars))
|
||||
|
||||
print(f'p_vars = {p_vars}')
|
||||
print(f'p1_first_iteration = {p1_first_iteration}')
|
||||
print(f't statistic = {t}')
|
||||
|
||||
with open(f'{get_current_timestamp()}-5x2cv-results-{dataset_to_use}.txt', mode='w') as f:
|
||||
print(config, file=f)
|
||||
print(f'Number of epochs = {num_epochs}', file=f)
|
||||
print('\n', file=f)
|
||||
print(f't statistic = {t}', file=f)
|
||||
|
||||
# Alpha value of 0.05
|
||||
# Under null hypothesis, t has a t-distribution with 5 degrees of freedom
|
||||
# ===> t > 2.571 with probability < 5%
|
||||
# If t > 2.571 => we can reject the null hypothesis
|
||||
# If t < 2.571 => we fail to reject the null hypothesis
|
||||
|
||||
if t >= 2.571:
|
||||
print('Reject H0')
|
||||
else:
|
||||
print('Fail to reject H0')
|
||||
@@ -0,0 +1,14 @@
|
||||
from statsmodels.stats.proportion import proportion_confint
|
||||
|
||||
model_performances = {
|
||||
'DNN_DEAP': [71, 100], # [n_of_corrects, n_total]
|
||||
'DNN_MAHNOB': [57, 86],
|
||||
'CNN_DEAP': [66, 100],
|
||||
'CNN_MAHNOB': [56, 86],
|
||||
}
|
||||
|
||||
confidence = 0.95
|
||||
|
||||
for model_name, model_performance in model_performances.items():
|
||||
lower, upper = proportion_confint(model_performance[0], model_performance[1], 1 - confidence)
|
||||
print(f'{model_name}: {(model_performance[0] / model_performance[1]):.3f}, lower={lower:.3f}, upper={upper:.3f}')
|
||||
@@ -0,0 +1,107 @@
|
||||
import torch, os, sys, yaml
|
||||
from torch.utils.data import DataLoader, SubsetRandomSampler
|
||||
import numpy as np
|
||||
from sklearn.model_selection import KFold
|
||||
|
||||
sys.path.insert(1, os.path.abspath('./scripts/nn/'))
|
||||
|
||||
from datasets import DEAP, MAHNOB
|
||||
from models import DNN, CNN
|
||||
from train_utils import train, check_accuracy
|
||||
from utils import get_current_timestamp
|
||||
|
||||
# Config file
|
||||
with open('./scripts/nn/configs/deap_dnn_arousal.yml') as yaml_file:
|
||||
config = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
|
||||
# Hyperparams
|
||||
num_epochs = 150
|
||||
batch_size = config['TRAIN']['batch_size']
|
||||
lr = config['TRAIN']['lr']
|
||||
momentum = config['TRAIN']['momentum']
|
||||
|
||||
# Model
|
||||
model_type = config['MODEL']['model']
|
||||
|
||||
# Train
|
||||
classification_of = config['TRAIN']['classification_of']
|
||||
|
||||
# Dataset
|
||||
dataset_to_use = config['DATASET']['dataset_to_use']
|
||||
|
||||
if dataset_to_use == 'deap':
|
||||
dataset_path = config['DATASET']['deap_dataset_path']
|
||||
dataset = DEAP(dataset_path)
|
||||
elif dataset_to_use == 'mahnob':
|
||||
dataset_path = config['DATASET']['mahnob_dataset_path']
|
||||
dataset = MAHNOB(dataset_path)
|
||||
else:
|
||||
assert False
|
||||
|
||||
|
||||
# K-fold external cross validation
|
||||
n_splits = 32
|
||||
kfold = KFold(n_splits=n_splits, shuffle=True)
|
||||
|
||||
accuracies = []
|
||||
|
||||
for fold, (train_idx, test_idx) in enumerate(kfold.split(np.arange(len(dataset)))):
|
||||
print('=' * 50)
|
||||
print(f'Fold #{fold+1}')
|
||||
|
||||
# Build data loader from k-fold indices
|
||||
train_sampler = SubsetRandomSampler(train_idx)
|
||||
test_sampler = SubsetRandomSampler(test_idx)
|
||||
|
||||
train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
|
||||
test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)
|
||||
|
||||
# Build model
|
||||
if model_type == 'dnn':
|
||||
model = DNN(
|
||||
sizes=tuple(config['MODEL']['sizes']),
|
||||
dropout_probs=tuple(config['MODEL']['dropout_probs'])
|
||||
)
|
||||
optimizer = torch.optim.RMSprop(model.parameters(), lr=lr)
|
||||
elif model_type == 'cnn':
|
||||
model = CNN(dropout_probs=tuple(config['MODEL']['dropout_probs']))
|
||||
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum)
|
||||
else:
|
||||
assert False
|
||||
|
||||
# Train model on training fold
|
||||
train(
|
||||
model,
|
||||
train_loader,
|
||||
torch.nn.BCEWithLogitsLoss(),
|
||||
optimizer,
|
||||
classification_of='valence',
|
||||
num_epochs=num_epochs,
|
||||
do_check_accuracy=False,
|
||||
)
|
||||
|
||||
# Check accuracy of model on test fold
|
||||
num_examples = len(test_loader.sampler)
|
||||
accuracy = check_accuracy(
|
||||
model,
|
||||
test_loader,
|
||||
num_examples=num_examples,
|
||||
classification_of='valence'
|
||||
)
|
||||
|
||||
print(f'Test accuracy on fold {fold+1} = {(accuracy*100):.3f}%')
|
||||
|
||||
accuracies.append(accuracy)
|
||||
|
||||
avg_accuracy = np.mean(np.array(accuracies))
|
||||
print(f'Average accuracy = {(avg_accuracy*100):.2f}%')
|
||||
|
||||
with open(f'{get_current_timestamp()}-cv-results-{dataset_to_use}-{model_type}.txt', mode='w') as f:
|
||||
print(f'Fold accuracies = {accuracies}', file=f)
|
||||
print(f'Avg accuracy = {(avg_accuracy*100):.2f}%', file=f)
|
||||
print('\n', file=f)
|
||||
print(config, file=f)
|
||||
print(f'Number of folds = {n_splits}', file=f)
|
||||
print(f'Number of epochs = {num_epochs}', file=f)
|
||||
|
||||
os.system(f'say Average k-fold accuracy: {(avg_accuracy*100):.2f}%')
|
||||
@@ -0,0 +1,118 @@
|
||||
import torch, yaml, os, sys
|
||||
import numpy as np
|
||||
|
||||
from torch.utils.data import DataLoader, random_split
|
||||
from statsmodels.stats.contingency_tables import mcnemar
|
||||
|
||||
sys.path.append(os.path.abspath('./scripts/nn/'))
|
||||
|
||||
from datasets import DEAP, MAHNOB
|
||||
from models import DNN, CNN
|
||||
|
||||
# Load config used for training
|
||||
# Used to get dataset_path, train/test split, etc.
|
||||
# but not training hyperparams and alike which aren't needed
|
||||
with open('./scripts/nn/configs/deap_dnn_valence.yml') as yaml_file:
|
||||
config = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
|
||||
|
||||
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||
|
||||
dataset_to_use = config['DATASET']['dataset_to_use']
|
||||
batch_size = 1
|
||||
dnn_model_path = f'./pretrained-models/{dataset_to_use}-dnn-valence.pt'
|
||||
cnn_model_path = f'./pretrained-models/{dataset_to_use}-cnn-valence.pt'
|
||||
|
||||
# Dataset and data loaders
|
||||
if dataset_to_use == 'deap':
|
||||
dataset_path = config['DATASET']['deap_dataset_path']
|
||||
dataset = DEAP(dataset_path)
|
||||
elif dataset_to_use == 'mahnob':
|
||||
dataset_path = config['DATASET']['mahnob_dataset_path']
|
||||
dataset = MAHNOB(dataset_path)
|
||||
else:
|
||||
assert False
|
||||
|
||||
seed = config['TRAIN']['seed']
|
||||
train_set_size, test_set_size = config['TRAIN']['train_test_split']
|
||||
train_set, test_set = random_split(
|
||||
dataset,
|
||||
[train_set_size, test_set_size],
|
||||
generator=torch.Generator().manual_seed(seed)
|
||||
)
|
||||
|
||||
print(f'{len(dataset)} examples found ({train_set_size} train, {test_set_size} test)')
|
||||
|
||||
train_loader = DataLoader(
|
||||
train_set,
|
||||
batch_size=batch_size,
|
||||
shuffle=True
|
||||
)
|
||||
|
||||
test_loader = DataLoader(
|
||||
test_set,
|
||||
batch_size=batch_size,
|
||||
shuffle=True
|
||||
)
|
||||
|
||||
# Create models and load pretrained weights
|
||||
dnn_model = DNN(
|
||||
sizes=(5000, 500, 1000),
|
||||
dropout_probs=(0.25, 0.5)
|
||||
)
|
||||
dnn_model.load_state_dict(torch.load(dnn_model_path, map_location=device))
|
||||
dnn_model.eval()
|
||||
|
||||
cnn_model = CNN(
|
||||
dropout_probs=(0.25, 0.15, 0.5, 0.25)
|
||||
)
|
||||
cnn_model.load_state_dict(torch.load(cnn_model_path, map_location=device))
|
||||
cnn_model.eval()
|
||||
|
||||
# McNemar's test
|
||||
|
||||
n00 = 0 # examples misclassified by both models
|
||||
n10 = 0 # examples misclassified by cnn, but not by dnn
|
||||
n01 = 0 # examples misclassified by dnn, but not by cnn
|
||||
n11 = 0 # examples classified correctly by both models
|
||||
|
||||
for data, label in test_loader:
|
||||
with torch.no_grad():
|
||||
dnn_model_pred = dnn_model(data)
|
||||
cnn_model_pred = cnn_model(data)
|
||||
|
||||
dnn_model_pred = torch.squeeze((dnn_model_pred >= 0.0).long())
|
||||
cnn_model_pred = torch.squeeze((cnn_model_pred >= 0.0).long())
|
||||
|
||||
label = label[0,0]
|
||||
|
||||
if dnn_model_pred != label and cnn_model_pred != label:
|
||||
n00 += 1
|
||||
elif dnn_model_pred == label and cnn_model_pred != label:
|
||||
n10 += 1
|
||||
elif dnn_model_pred != label and cnn_model_pred == label:
|
||||
n01 += 1
|
||||
elif dnn_model_pred == label and cnn_model_pred == label:
|
||||
n11 += 1
|
||||
else:
|
||||
assert False
|
||||
|
||||
print(f'n00={n00}, n01={n01}, n10={n10}, n11={n11}')
|
||||
print(f'n00 + n01 + n10 + n11 = {n00 + n01 + n10 + n11}')
|
||||
assert (n00 + n01 + n10 + n11) == len(test_set)
|
||||
|
||||
contingency_table = [
|
||||
[n11, n10],
|
||||
[n01, n00]
|
||||
]
|
||||
|
||||
# Calculate McNemar's statistic
|
||||
result = mcnemar(contingency_table, exact=True)
|
||||
|
||||
print('statistic=%.3f, p-value=%.3f' % (result.statistic, result.pvalue))
|
||||
|
||||
alpha = 0.05
|
||||
if result.pvalue > alpha:
|
||||
print('Null hypothesis cannot be rejected. The two models have NO meaningfully different performances.')
|
||||
else:
|
||||
print('Null hypothesis can be rejected. The two models have meaningfully different performances.')
|
||||
Referência em uma Nova Issue
Bloquear um usuário