468 linhas
16 KiB
Python
468 linhas
16 KiB
Python
#!/usr/bin/env python
|
||
# coding: utf-8
|
||
|
||
# In[ ]:
|
||
|
||
|
||
# -*- coding: utf-8 -*-
|
||
"""feature_extraction_25GB_RAM.ipynb
|
||
|
||
Automatically generated by Colaboratory.
|
||
|
||
Original file is located at
|
||
https://colab.research.google.com/drive/1QnVj7GyyJhLPrYF4vBTppMwynXqmOTEJ
|
||
"""
|
||
|
||
# Commented out IPython magic to ensure Python compatibility.
|
||
|
||
|
||
import EEGExtract as eeg
|
||
from scipy import io,signal
|
||
import numpy as np
|
||
import pandas as pd
|
||
from sklearn import preprocessing
|
||
import pandas as pd
|
||
import pickle
|
||
|
||
|
||
class load_data:
|
||
'''
|
||
Load the preprocessed data here, store the paramters
|
||
'''
|
||
def __init__(self,name):
|
||
self.name = name #name of dataset
|
||
self.X = None
|
||
self.Y = None
|
||
self.Z = None
|
||
self.freq = None #(in Hz) is same for all datasets
|
||
self.channels = None
|
||
self.ch_type = 'eeg'
|
||
self.eegData = None
|
||
self.use_autoreject = 'n'
|
||
def load_arrays(self):
|
||
if self.name == 'DREAMER':
|
||
array = np.load('original_data/DREAMER.npz')
|
||
self.freq = 128
|
||
self.channels = ['AF3','F7','F3','FC5','T7','P7','O1','O2','P8','T8','FC6','F4','F8','AF4']
|
||
if self.name == 'DEAP':
|
||
array = np.load('original_data/DEAP.npz')
|
||
self.freq = 128
|
||
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
||
self.channels = ['F1', 'AF3', 'F3', 'F7', 'FC5', 'FC1', 'C3', 'T7', 'CP5', 'CP1', 'P3', 'P7', 'PO3', 'O1', 'Oz', 'Pz', 'Fp2', 'AF4', 'Fz', 'F4', 'F8', 'FC6', 'FC2', 'Cz', 'C4', 'T8', 'CP6', 'CP2', 'P4', 'P8', 'PO4', 'O2', 'hEOG','vEOG', 'zEMG','tEMG','GSR','Respiration belt','Plethysmograph','Temperature']
|
||
if self.name == 'OASIS':
|
||
#array = np.load('original_data/OASIS.npz')
|
||
if self.use_autoreject == 'y':
|
||
with open('preprocessed_data/oasis/with_autoreject.p','rb') as file:
|
||
self.X = pickle.load(file)
|
||
self.channels = ['AF3', 'F7', 'F3', 'FC5', 'T7', 'P7', 'O1', 'O2', 'P8', 'T8', 'FC6', 'F4', 'F8', 'AF4']
|
||
self.freq = 128
|
||
self.X ,self.Y= merge_dictionary(self.X)
|
||
(a,b,c) = self.X.shape
|
||
self.X = np.reshape(self.X,(a,c,b))
|
||
else:
|
||
array = np.load('preprocessed_data/oasis/without_autoreject.npz')
|
||
self.freq = 128
|
||
self.channels = ['AF3','F7','F3','FC5','T7','P7','O1','O2','P8','T8','FC6','F4','F8','AF4']
|
||
self.X = array['X']
|
||
self.Y = array['Y']
|
||
(a,b,c) = self.X.shape
|
||
self.X = np.reshape(self.X,(a,c,b))
|
||
|
||
else:
|
||
self.X = array['X']
|
||
if self.name == 'DEAP':
|
||
self.X = self.X[:,:,[1,3,2,4,7,11,13,31,29,25,21,19,20,17]] # To maintain uniformity across all datasets, only 14 electrodes selected
|
||
self.channels = ['AF3', 'F7', 'F3', 'FC5', 'T7', 'P7', 'O1', 'O2', 'P8', 'T8', 'FC6', 'F4', 'F8', 'AF4']
|
||
if self.name != 'OASIS':
|
||
self.Y = array['Y']
|
||
#self.Z = array['Z']
|
||
self.reshape_data()
|
||
def reshape_data(self):
|
||
'''
|
||
reshapes data in the format EEGExtract module expects i.e channels x timepoints x epochs
|
||
'''
|
||
|
||
(epochs,timepoints,channels) = self.X.shape
|
||
self.eegData = np.reshape(self.X,(channels,timepoints,epochs))
|
||
|
||
class features:
|
||
############################ Complexity Features #############################
|
||
#1>
|
||
@staticmethod
|
||
def ShannonRes(eegData,**args):
|
||
#Shannon Entropy
|
||
ShannonRes = eeg.shannonEntropy(eegData, bin_min=-200, bin_max=200, binWidth=2)
|
||
return ShannonRes
|
||
#2>
|
||
@staticmethod
|
||
def ShannonRes_sub_band_delta(eegData,fs):
|
||
# Subband Information Quantity
|
||
# delta (0.5–4 Hz)
|
||
eegData_delta = eeg.filt_data(eegData, 0.5, 4, fs)
|
||
ShannonRes_delta = eeg.shannonEntropy(eegData_delta, bin_min=-200, bin_max=200, binWidth=2)
|
||
return ShannonRes_delta
|
||
#3>
|
||
@staticmethod
|
||
def ShannonRes_sub_band_theta(eegData,fs):
|
||
# theta (4–8 Hz)
|
||
eegData_theta = eeg.filt_data(eegData, 4, 8, fs)
|
||
ShannonRes_theta = eeg.shannonEntropy(eegData_theta, bin_min=-200, bin_max=200, binWidth=2)
|
||
return ShannonRes_theta
|
||
|
||
#4>
|
||
@staticmethod
|
||
def ShannonRes_sub_band_alpha(eegData,fs):
|
||
# alpha (8–12 Hz)
|
||
eegData_alpha = eeg.filt_data(eegData, 8, 12, fs)
|
||
ShannonRes_alpha = eeg.shannonEntropy(eegData_alpha, bin_min=-200, bin_max=200, binWidth=2)
|
||
return ShannonRes_alpha
|
||
|
||
#5>
|
||
@staticmethod
|
||
def ShannonRes_sub_band_beta(eegData,fs):
|
||
# beta (12–30 Hz)
|
||
eegData_beta = eeg.filt_data(eegData, 12, 30, fs)
|
||
ShannonRes_beta = eeg.shannonEntropy(eegData_beta, bin_min=-200, bin_max=200, binWidth=2)
|
||
return ShannonRes_beta
|
||
|
||
#6>
|
||
@staticmethod
|
||
def ShannonRes_sub_band_gamma(eegData,fs):
|
||
# gamma (30–100 Hz)
|
||
eegData_gamma = eeg.filt_data(eegData, 30, 63, fs)
|
||
ShannonRes_gamma = eeg.shannonEntropy(eegData_gamma, bin_min=-200, bin_max=200, binWidth=2)
|
||
return ShannonRes_gamma
|
||
|
||
|
||
#7>
|
||
@staticmethod
|
||
def Hojorth_Mobility(eegData,**args):
|
||
# Hjorth Mobility
|
||
# Hjorth Complexity
|
||
HjorthMob, HjorthComp = eeg.hjorthParameters(eegData)
|
||
return HjorthMob
|
||
#8>
|
||
@staticmethod
|
||
def Hojorth_Complexity(eegData,**args):
|
||
# Hjorth Mobility
|
||
# Hjorth Complexity
|
||
HjorthMob, HjorthComp = eeg.hjorthParameters(eegData)
|
||
return HjorthComp
|
||
#9>
|
||
@staticmethod
|
||
def False_Nearest_Neighbour(eegData,**args):
|
||
# False Nearest Neighbor
|
||
FalseNnRes = eeg.falseNearestNeighbor(eegData)
|
||
return FalseNnRes
|
||
##############################################################################
|
||
|
||
########################Category Features#####################################
|
||
#10>
|
||
@staticmethod
|
||
def median_frequency(eegData,fs):
|
||
#fs-sampling frequency
|
||
# Median Frequency
|
||
medianFreqRes = eeg.medianFreq(eegData,fs)
|
||
return medianFreqRes
|
||
|
||
#11>
|
||
@staticmethod
|
||
def band_power_delta(eegData,fs):
|
||
#fs - sampling frequency
|
||
# δ band Power
|
||
bandPwr_delta = eeg.bandPower(eegData, 0.5, 4, fs)
|
||
return bandPwr_delta
|
||
#12>
|
||
@staticmethod
|
||
def band_power_theta(eegData,fs):
|
||
#fs - sampling frequency
|
||
# θ band Power
|
||
bandPwr_theta = eeg.bandPower(eegData, 4, 8, fs)
|
||
return bandPwr_theta
|
||
|
||
#13>
|
||
@staticmethod
|
||
def band_power_alpha(eegData,fs):
|
||
#fs - sampling frequency
|
||
# α band Power
|
||
bandPwr_alpha = eeg.bandPower(eegData, 8, 12, fs)
|
||
return bandPwr_alpha
|
||
|
||
#14>
|
||
@staticmethod
|
||
def band_power_beta(eegData,fs):
|
||
#fs - sampling frequency
|
||
# β band Power
|
||
bandPwr_beta = eeg.bandPower(eegData, 12, 30, fs)
|
||
return bandPwr_beta
|
||
|
||
#15>
|
||
@staticmethod
|
||
def band_power_gamma(eegData,fs):
|
||
#fs - sampling frequency
|
||
# γ band Power
|
||
bandPwr_gamma = eeg.bandPower(eegData, 30, 63, fs)
|
||
return bandPwr_gamma
|
||
|
||
#16>
|
||
@staticmethod
|
||
def standard_deviation(eegData,**args):
|
||
# Standard Deviation
|
||
std_res = eeg.eegStd(eegData)
|
||
return std_res
|
||
|
||
#17>
|
||
@staticmethod
|
||
def regularity(eegData,fs):
|
||
# Regularity (burst-suppression)
|
||
regularity_res = eeg.eegRegularity(eegData,fs)
|
||
return regularity_res
|
||
|
||
|
||
#18>
|
||
@staticmethod
|
||
def Diffuse_slowing(eegData,**args):
|
||
# Diffuse Slowing
|
||
df_res = eeg.diffuseSlowing(eegData)
|
||
return df_res
|
||
|
||
#19>
|
||
@staticmethod
|
||
def Spikes(eegData,fs,**args):
|
||
# Spikes
|
||
minNumSamples = int(70*fs/1000)
|
||
spikeNum_res = eeg.spikeNum(eegData,minNumSamples)
|
||
return spikeNum_res
|
||
|
||
#20>
|
||
@staticmethod
|
||
def delta_burst_after_spike(eegData,fs):
|
||
# Delta burst after Spike
|
||
eegData_delta = eeg.filt_data(eegData, 0.5, 4, fs)
|
||
deltaBurst_res = eeg.burstAfterSpike(eegData,eegData_delta,minNumSamples=7,stdAway = 3)
|
||
return deltaBurst_res
|
||
|
||
#21>
|
||
@staticmethod
|
||
def Sharp_spike(eegData,fs):
|
||
minNumSamples = int(70*fs/1000)
|
||
# Sharp spike
|
||
sharpSpike_res = eeg.shortSpikeNum(eegData,minNumSamples)
|
||
return sharpSpike_res
|
||
|
||
#22>
|
||
@staticmethod
|
||
def Number_of_Burst(eegData,fs):
|
||
# Number of Bursts
|
||
numBursts_res = eeg.numBursts(eegData,fs)
|
||
return numBursts_res
|
||
|
||
#23>
|
||
@staticmethod
|
||
def Burst_length_u_and_sigma_mean(eegData,fs):
|
||
# Burst length μ and σ
|
||
burstLenMean_res,burstLenStd_res = eeg.burstLengthStats(eegData,fs)
|
||
return burstLenMean_res
|
||
|
||
#24>
|
||
@staticmethod
|
||
def Burst_length_u_and_sigma_std(eegData,fs):
|
||
burstLenMean_res,burstLenStd_res = eeg.burstLengthStats(eegData,fs)
|
||
return burstLenStd_res
|
||
|
||
|
||
#25>
|
||
@staticmethod
|
||
def no_of_suprression(eegData,fs):
|
||
# Number of Suppressions
|
||
numSupps_res = eeg.numSuppressions(eegData,fs)
|
||
return numSupps_res
|
||
|
||
#26>
|
||
@staticmethod
|
||
def Suppression_length_u_and_sigma_mean(eegData,fs):
|
||
# Suppression length μ and σ
|
||
suppLenMean_res,suppLenStd_res = eeg.suppressionLengthStats(eegData,fs)
|
||
return suppLenMean_res
|
||
|
||
#27>
|
||
@staticmethod
|
||
def Suppression_length_u_and_sigma_std(eegData,fs):
|
||
# Suppression length μ and σ
|
||
suppLenMean_res,suppLenStd_res = eeg.suppressionLengthStats(eegData,fs)
|
||
return suppLenStd_res
|
||
|
||
##############################################################################
|
||
|
||
def merge_dictionary(dictionary):
|
||
'''
|
||
merge all trial data to form one array
|
||
'''
|
||
no_of_trials = len(list(dictionary.keys()))
|
||
no_of_channels = dictionary[1][0].shape[1]
|
||
length_of_segment = dictionary[1][0].shape[2]
|
||
no_of_epochs_per_trial = dictionary[1][0].shape[0]
|
||
X = np.empty((0,no_of_channels,length_of_segment))
|
||
Y = np.empty((0,2))
|
||
for trial,lst in dictionary.items():
|
||
array = dictionary[trial][0]
|
||
score = dictionary[trial][3]
|
||
X = np.append(X,array,axis = 0)
|
||
for epoch in range(no_of_epochs_per_trial):
|
||
Y = np.append(Y,np.expand_dims(score,axis =0),axis = 0)
|
||
|
||
return X,Y
|
||
|
||
def epoch_data(X,Y, window, stride, sfreq):
|
||
|
||
(channels,timepoints,trials )= X.shape
|
||
X = np.reshape(X,(trials,channels,timepoints))
|
||
segment = int(window*sfreq)
|
||
step = int(stride*sfreq)
|
||
epochPerTrial = int((timepoints-segment)/step + 1)
|
||
count = 0
|
||
X_new = np.empty((trials*epochPerTrial,channels,segment))
|
||
Y_new = np.empty((trials*epochPerTrial,2))
|
||
for trial in range(trials):
|
||
for epoch in range(epochPerTrial):
|
||
X_new[count,:,:] = X[trial,:,epoch*step:(epoch*step)+segment]
|
||
Y_new[count,:] = Y[trial,:2]
|
||
count+=1
|
||
(trials,channels,timepoints) = X_new.shape
|
||
X_new = np.reshape(X_new,(channels,timepoints,trials))
|
||
|
||
return X_new,Y_new
|
||
|
||
def driver_code():
|
||
dataset_dictionary = {0:'DEAP',1:'OASIS',2:'DREAMER'}
|
||
print(dataset_dictionary)
|
||
print('enter number for loading dataset')
|
||
mapping = int(input())
|
||
print('plz wait loading dataset preprocessed arrays')
|
||
dataset = load_data(dataset_dictionary[mapping])
|
||
if mapping == 1:
|
||
print('do you want to use with autoreject data? if yes press y')
|
||
boolean = input()
|
||
if boolean == 'y':
|
||
dataset.use_autoreject = 'y'
|
||
|
||
dataset.load_arrays()
|
||
print('loading complete')
|
||
print('shape of data we will use to make features:',dataset.eegData.shape)
|
||
print('do you want to segment the data before calculating feature values? y/n')
|
||
boolean = input()
|
||
if boolean == 'y':
|
||
window = float(input('enter window size'))
|
||
stride = float(input('enter stride size'))
|
||
dataset.eegData,dataset.Y = epoch_data(dataset.eegData,dataset.Y,window,stride,dataset.freq)
|
||
print('new shapes of X and Y:',dataset.eegData.shape,' ',dataset.Y.shape)
|
||
else:
|
||
window = 0
|
||
stride = 0
|
||
print('features available')
|
||
featuresDict = {0:'shannonEntropy',
|
||
1:'ShannonRes_sub_bands_alpha',
|
||
2:'ShannonRes_sub_bands_beta',
|
||
3:'ShannonRes_sub_bands_delta',
|
||
4:'ShannonRes_sub_bands_theta',
|
||
5:'ShannonRes_sub_bands_gamma',
|
||
6:'Hjorth_mobilty',
|
||
7:'Hjorth_complexity',
|
||
8:'falseNearestNeighbor',
|
||
9:'medianFreq',
|
||
10:'bandPwr_alpha',
|
||
11:'bandPwr_beta',
|
||
12:'bandPwr_gamma',
|
||
13:'bandPwr_theta',
|
||
14:'bandPwr_delta',
|
||
15:'stdDev',
|
||
16:'diffuseSlowing',
|
||
17:'spikeNum',
|
||
18:'deltaBurstAfterSpike',
|
||
19:'shortSpikeNum',
|
||
20:'Sharp spike',
|
||
21:'numBursts',
|
||
22:'burstLen_u_and_sigma_mean',
|
||
23:'burstLen_u_and_sigma_std',
|
||
24:'numSuppressions',
|
||
25:'suppressionLen_u_and_sigma_mean',
|
||
26:'suppressionLen_u_and_sigma_std',
|
||
}
|
||
featureMethod={0:features.ShannonRes,
|
||
1:features.ShannonRes_sub_band_alpha,
|
||
2:features.ShannonRes_sub_band_beta,
|
||
3:features.ShannonRes_sub_band_delta,
|
||
4:features.ShannonRes_sub_band_theta,
|
||
5:features.ShannonRes_sub_band_gamma,
|
||
6:features.Hojorth_Mobility,
|
||
7:features.Hojorth_Complexity,
|
||
8:features.False_Nearest_Neighbour,
|
||
9:features.median_frequency,
|
||
10:features.band_power_alpha,
|
||
11:features.band_power_beta,
|
||
12:features.band_power_gamma,
|
||
13:features.band_power_theta,
|
||
14:features.band_power_delta,
|
||
15:features.standard_deviation,
|
||
16:features.regularity,
|
||
17:features.Diffuse_slowing,
|
||
18:features.Spikes,
|
||
19:features.delta_burst_after_spike,
|
||
20:features.Sharp_spike,
|
||
21:features.Number_of_Burst,
|
||
22:features.Burst_length_u_and_sigma_mean,
|
||
23:features.Burst_length_u_and_sigma_std,
|
||
24:features.no_of_suprression,
|
||
25:features.Suppression_length_u_and_sigma_mean,
|
||
26:features.Suppression_length_u_and_sigma_std,
|
||
}
|
||
|
||
|
||
print(featuresDict)
|
||
|
||
#define path for saving before hand in np.savez line below
|
||
path = 'features/'
|
||
#os.mkdir('features/'+window+'_'+stride)
|
||
if dataset.name == 'DEAP':
|
||
path = path +'deap/'
|
||
elif dataset.name == 'DREAMER':
|
||
path = path + 'dreamer/'
|
||
else:
|
||
if dataset.use_autoreject == 'y':
|
||
path = path +'oasis/with_autoreject/'
|
||
else:
|
||
path = path +'oasis/without_autoreject/'
|
||
boolean = input('do you want to individually make features? y/n')
|
||
if boolean =='n':
|
||
for key in featureMethod.keys():
|
||
feature_matrix = featureMethod[key](eegData = dataset.eegData,fs=dataset.freq)
|
||
filename = featuresDict[key]
|
||
print('saving ---',filename)
|
||
np.savez(path+filename+'_'+str(int(window))+'_'+str(int(stride)),features = feature_matrix , Y = dataset.Y)
|
||
else:
|
||
found_features = False
|
||
while not found_features:
|
||
print('enter feature no')
|
||
key = int(input())
|
||
feature_matrix = featureMethod[key](eegData = dataset.eegData,fs=dataset.freq)
|
||
filename = featuresDict[key]
|
||
print('saving ---',filename)
|
||
np.savez(path+filename+'_'+str(int(window))+'_'+str(int(stride)),features = feature_matrix , Y = dataset.Y)
|
||
boolean = input('do you want to find more features? y/n ')
|
||
if boolean =='n':
|
||
found_features = True
|
||
|
||
|
||
print('feature extraction done!!!!')
|
||
|
||
def __main__():
|
||
driver_code()
|
||
|
||
__main__()
|
||
|
||
if __name__ == 'main':
|
||
driver_code()
|
||
|
||
|
||
|