new lab new folder
Esse commit está contido em:
@@ -0,0 +1,19 @@
|
||||
## code by Alexandre Barachant
|
||||
## adapted by Pierre Karashchuk for compatibility with Python3
|
||||
|
||||
from pylsl import StreamInfo, StreamOutlet
|
||||
import numpy as np
|
||||
try:
|
||||
input = raw_input
|
||||
except NameError:
|
||||
pass
|
||||
|
||||
info = StreamInfo('Ganglion_EEG', 'EEG', 4, 200, 'float32',
|
||||
'Ganglion_123456789')
|
||||
outlet = StreamOutlet(info)
|
||||
while True:
|
||||
strSample = input().split(': ', 1)
|
||||
sample = 1e6*np.array(list(map(float, strSample[1].split(' '))))
|
||||
stamp = float(strSample[0])*1e-3
|
||||
outlet.push_sample(sample, stamp)
|
||||
# print('Pushed Sample At: ' + strSample[0])
|
||||
@@ -0,0 +1,24 @@
|
||||
# Lab 5: SSVEP
|
||||
|
||||
### Introduction
|
||||
In this lab, we will record EEG while trying to remember words, as well as later recognizing these same words among others. Hopefully, we'll be able to see the event related potentials corresponding to remembered vs not-remembered words, and possibly recognized vs not recognized words.
|
||||
|
||||
### Setup
|
||||
|
||||
First, install the libraries:
|
||||
``` bash
|
||||
npm install
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
(If you don't have `npm`, you can install by running `brew install node`. You can get `brew` from https://brew.sh/)
|
||||
|
||||
### Stimulus Presentation + Recording
|
||||
|
||||
|
||||
- Attach electrodes to participant's head, preferably in visual cortex on the back of the head.
|
||||
- Have participant sit in chair in front of monitor
|
||||
- Connect to the ganglion and stream data: `node ganglion-lsl.js`
|
||||
- Run lsl-viewer to check connections and stream: `python lsl-viewer.py`
|
||||
- Run SSVEP stimulus: `python stimulus.py`
|
||||
|
||||
Diff do arquivo suprimido porque uma ou mais linhas são muito longas
Arquivo executável
+204
@@ -0,0 +1,204 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# from myo_raw import MyoRaw
|
||||
# from csv_collector_myo import CSVCollector
|
||||
import pygame
|
||||
from pygame.locals import *
|
||||
import random
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
|
||||
fullscreen_on = False
|
||||
|
||||
size = width, height = int(2560/2), int(1600/2)
|
||||
|
||||
black = (0,0,0)
|
||||
white = (255, 255, 255)
|
||||
background_color = white
|
||||
|
||||
def stroop(congruent = False, speed=1.0, duration=20):
|
||||
"""Stroop Test HARD"""
|
||||
colors = {"BLUE" : (0,0,225), "GREEN" : (0,100,0), "YELLOW" : (230,212,41),
|
||||
"RED" : (255,0,0), "PURPLE" : (160,32,240), "BLACK" : (0,0,0)}
|
||||
|
||||
def newcolor():
|
||||
# any color but black or white
|
||||
return (random.choice(list(colors.values())))
|
||||
|
||||
def write(msg="Hello", color = (0,100,0)):
|
||||
myfont = pygame.font.SysFont("None", 250)
|
||||
mytext = myfont.render(msg, True, color)
|
||||
mytext = mytext.convert_alpha()
|
||||
return mytext
|
||||
|
||||
x = width / 2.0
|
||||
y = height / 2.0
|
||||
dx = 0
|
||||
dy = 0
|
||||
|
||||
background = pygame.Surface((screen.get_width(), screen.get_height()))
|
||||
background.fill(background_color) # white
|
||||
background = background.convert()
|
||||
screen.blit(background, (0,0)) # clean whole screen
|
||||
clock = pygame.time.Clock()
|
||||
# FPS = 1.5 * speed # desired framerate in frames per second.
|
||||
wait_time = 1.0 / speed
|
||||
|
||||
start = time.time()
|
||||
|
||||
while time.time() - start < duration:
|
||||
screen.blit(background, (0,0)) # clean whole screen
|
||||
# milliseconds = clock.tick(FPS) # milliseconds passed since last frame
|
||||
# seconds = milliseconds / 1000.0 # seconds passed since last frame
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.QUIT:
|
||||
return False
|
||||
elif event.type == pygame.KEYDOWN:
|
||||
if event.key == pygame.K_ESCAPE:
|
||||
return False
|
||||
text = random.choice(["RED", "GREEN", "BLUE", "BLACK", "PURPLE"])
|
||||
color = (newcolor())
|
||||
if congruent:
|
||||
color = colors[text]
|
||||
textsurface = write(text, color)
|
||||
#screen.blit(background, (0,0)) # clean whole screen
|
||||
|
||||
rect = textsurface.get_rect()
|
||||
rect.center = (x, y)
|
||||
|
||||
screen.blit(textsurface, rect)
|
||||
pygame.display.flip()
|
||||
|
||||
time.sleep(wait_time)
|
||||
check_collection()
|
||||
# pygame.quit()
|
||||
return True
|
||||
|
||||
|
||||
def draw_text(text, center, color=black, size=22, bold=False, background=None,
|
||||
left=False):
|
||||
# font = pygame.font.Font(font_path, size, bold=bold)
|
||||
font = pygame.font.SysFont("None", size, bold=bold)
|
||||
if background:
|
||||
surface = font.render(text, True, color, background)
|
||||
else:
|
||||
surface = font.render(text, True, color)
|
||||
|
||||
rect = surface.get_rect()
|
||||
rect.center = tuple(center)
|
||||
if left:
|
||||
rect.left = center[0]
|
||||
|
||||
screen.blit(surface, rect)
|
||||
return rect
|
||||
|
||||
def text_slide(text, duration=20):
|
||||
screen.fill(background_color)
|
||||
draw_text(text, (width/2.0, height/2.0), size=250)
|
||||
pygame.display.flip()
|
||||
|
||||
start = time.time()
|
||||
|
||||
while time.time() - start < duration:
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.QUIT:
|
||||
return False
|
||||
elif event.type == pygame.KEYDOWN:
|
||||
if event.key == pygame.K_ESCAPE:
|
||||
return False
|
||||
time.sleep(0.01)
|
||||
check_collection()
|
||||
|
||||
return True
|
||||
|
||||
def focus_slide(duration=20):
|
||||
return text_slide('+', duration)
|
||||
|
||||
fname = '../data/data_stroop_myo_{}.csv'.format(int(time.time()))
|
||||
if len(sys.argv) >= 2:
|
||||
fname = sys.argv[1]
|
||||
|
||||
# collector = CSVCollector(fname=fname)
|
||||
# collector.tag('start')
|
||||
|
||||
def check_collection():
|
||||
# if collector.time_since_last_sample() > 1.2:
|
||||
# print('detected disconnect!')
|
||||
# print('attempting reconnect')
|
||||
# # collector.stop()
|
||||
# time.sleep(1.5)
|
||||
# # collector.start()
|
||||
# time.sleep(2)
|
||||
pass
|
||||
|
||||
# print("get ready...")
|
||||
# time.sleep(2)
|
||||
# collector.start()
|
||||
# print("a bit more...")
|
||||
# time.sleep(2)
|
||||
# print("go!")
|
||||
|
||||
pygame.init()
|
||||
|
||||
if fullscreen_on:
|
||||
screen = pygame.display.set_mode(size, FULLSCREEN)
|
||||
else:
|
||||
screen = pygame.display.set_mode(size)
|
||||
|
||||
pygame.mouse.set_visible(False)
|
||||
|
||||
# relax, movement
|
||||
# low, medium, hard
|
||||
# speed 0.5, 1.0, 2.0
|
||||
|
||||
trial_duration = 10
|
||||
focus_duration = 5
|
||||
num_repeats = 2
|
||||
|
||||
# trial_duration = 2
|
||||
# focus_duration = 1
|
||||
# num_repeats = 1
|
||||
|
||||
conditions = [
|
||||
# ('relax', None, None), ('move', None, None),
|
||||
('low', 1.1, True), ('medium', 1.1, False), ('hard', 1.8, False)
|
||||
]
|
||||
|
||||
conds = conditions * num_repeats
|
||||
random.shuffle(conds)
|
||||
|
||||
for cond_name, speed, congruent in conds:
|
||||
# collector.tag('focus')
|
||||
x = focus_slide(focus_duration)
|
||||
if not x:
|
||||
break
|
||||
|
||||
if speed is None:
|
||||
# collector.tag(cond_name)
|
||||
x = text_slide(cond_name, trial_duration)
|
||||
else:
|
||||
# collector.tag(cond_name)
|
||||
x = stroop(congruent, speed, trial_duration)
|
||||
|
||||
# check_collection()
|
||||
|
||||
if not x:
|
||||
break
|
||||
# print(cond_name)
|
||||
# time.sleep(1)
|
||||
# time.sleep(trial_duration)
|
||||
# stroop()
|
||||
# collector.stop()
|
||||
pygame.quit()
|
||||
os._exit(0)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
// code by Alexandre Barachant
|
||||
|
||||
const Ganglion = require('openbci-ganglion').Ganglion;
|
||||
const ganglion = new Ganglion();
|
||||
// Construct LSL Handoff Python Shell
|
||||
var PythonShell = require('python-shell');
|
||||
var lsloutlet = new PythonShell('LSLHandoff.py');
|
||||
|
||||
lsloutlet.on('message', function(message){
|
||||
console.log('LslOutlet: ' + message);
|
||||
});
|
||||
console.log('Python Shell Created for LSLHandoff');
|
||||
|
||||
ganglion.once('ganglionFound', (peripheral) => {
|
||||
// Stop searching for BLE devices once a ganglion is found.
|
||||
ganglion.searchStop();
|
||||
ganglion.on('sample', (sample) => {
|
||||
/** Work with sample */
|
||||
st = sample.channelData.join(' ');
|
||||
var s = ''+ sample.timeStamp + ': '+ st
|
||||
lsloutlet.send(s)
|
||||
});
|
||||
ganglion.once('ready', () => {
|
||||
ganglion.streamStart();
|
||||
});
|
||||
ganglion.connect(peripheral);
|
||||
});
|
||||
// Start scanning for BLE devices
|
||||
ganglion.searchStart();
|
||||
Arquivo executável
+241
@@ -0,0 +1,241 @@
|
||||
#!/usr/bin/env python
|
||||
## code by Alexandre Barachant
|
||||
## modified by Pierre Karashchuk
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from scipy.signal import butter, filtfilt
|
||||
from time import time, sleep
|
||||
from pylsl import StreamInlet, resolve_byprop
|
||||
import seaborn as sns
|
||||
from threading import Thread
|
||||
from scipy import signal
|
||||
|
||||
sns.set(style="whitegrid")
|
||||
|
||||
from optparse import OptionParser
|
||||
|
||||
parser = OptionParser()
|
||||
|
||||
parser.add_option("-w", "--window",
|
||||
dest="window", type='float', default=6,
|
||||
help="window lenght to display in seconds.")
|
||||
parser.add_option("-s", "--scale",
|
||||
dest="scale", type='float', default=100,
|
||||
help="scale in uV")
|
||||
parser.add_option("-r", "--refresh",
|
||||
dest="refresh", type='float', default=0.2,
|
||||
help="refresh rate in seconds.")
|
||||
parser.add_option("-f", "--figure",
|
||||
dest="figure", type='string', default="15x6",
|
||||
help="window size.")
|
||||
|
||||
filt = True
|
||||
subsample = 2
|
||||
buf = 12
|
||||
|
||||
(options, args) = parser.parse_args()
|
||||
|
||||
window = options.window
|
||||
scale = options.scale
|
||||
figsize = np.int16(options.figure.split('x'))
|
||||
refresh = options.refresh
|
||||
|
||||
print("looking for an EEG stream...")
|
||||
streams = resolve_byprop('type', 'EEG', timeout=2)
|
||||
|
||||
if len(streams) == 0:
|
||||
raise(RuntimeError("Cant find EEG stream"))
|
||||
print("Start aquiring data")
|
||||
|
||||
|
||||
class LSLViewer():
|
||||
def __init__(self, stream, fig, axes, window, scale, dejitter=True):
|
||||
"""Init"""
|
||||
self.stream = stream
|
||||
self.window = window
|
||||
self.scale = scale
|
||||
self.dejitter = dejitter
|
||||
self.inlet = StreamInlet(stream, max_chunklen=buf)
|
||||
self.filt = True
|
||||
info = self.inlet.info()
|
||||
description = info.desc()
|
||||
|
||||
self.sfreq = info.nominal_srate()
|
||||
self.n_samples = int(self.sfreq * self.window)
|
||||
self.n_chan = info.channel_count()
|
||||
|
||||
ch = description.child('channels').first_child()
|
||||
ch_names = [ch.child_value('label')]
|
||||
|
||||
for i in range(self.n_chan):
|
||||
ch = ch.next_sibling()
|
||||
ch_names.append(ch.child_value('label'))
|
||||
|
||||
self.ch_names = ch_names
|
||||
|
||||
fig.canvas.mpl_connect('key_press_event', self.OnKeypress)
|
||||
fig.canvas.mpl_connect('button_press_event', self.onclick)
|
||||
|
||||
self.fig = fig
|
||||
self.axes = axes
|
||||
|
||||
|
||||
sns.despine(left=True)
|
||||
|
||||
self.data = np.zeros((self.n_samples, self.n_chan))
|
||||
self.times = np.arange(-self.window, 0, 1./self.sfreq)
|
||||
impedances = np.std(self.data, axis=0)
|
||||
lines = []
|
||||
|
||||
self.rects = self.axes[1].bar(0, 1)
|
||||
|
||||
lines = []
|
||||
|
||||
for ii in range(self.n_chan):
|
||||
line, = self.axes[0].plot(self.times[::subsample],
|
||||
self.data[::subsample, ii] - ii, lw=1)
|
||||
lines.append(line)
|
||||
self.lines = lines
|
||||
|
||||
|
||||
# self.text = axes.
|
||||
|
||||
self.axes[1].xaxis.grid(False)
|
||||
self.axes[1].set_xticks([])
|
||||
self.axes[1].set_ylim([0,120])
|
||||
self.value = None
|
||||
|
||||
self.display_every = int(refresh / (12/self.sfreq))
|
||||
|
||||
self.bf, self.af = butter(4, np.array([0.5,20])/(self.sfreq/2.),
|
||||
'bandpass')
|
||||
|
||||
self.low = 10000
|
||||
self.high = 0
|
||||
|
||||
def compute_value(self):
|
||||
data_f1 = filtfilt(self.bf, self.af, self.data[:, 0])
|
||||
|
||||
data_f1 -= np.mean(data_f1)
|
||||
data_f1 /= np.std(data_f1)
|
||||
|
||||
rises = np.where(np.diff(1.0*(np.abs(data_f1) > 2)) == 1)[0]
|
||||
|
||||
rr = np.diff(rises)/self.sfreq
|
||||
print(1/np.mean(rr), rr)
|
||||
|
||||
return 60./np.mean(rr)
|
||||
|
||||
|
||||
def update_plot(self):
|
||||
value = self.compute_value()
|
||||
|
||||
if np.isnan(value):
|
||||
return
|
||||
|
||||
if self.value is None:
|
||||
self.value = value
|
||||
|
||||
self.value = 0.8 * self.value + 0.2 * value
|
||||
|
||||
self.low = min(self.low, self.value)
|
||||
self.high = max(self.high, self.value)
|
||||
|
||||
rect = self.rects.get_children()[0]
|
||||
rect.set_height(self.value)
|
||||
|
||||
self.axes[1].set_ylim([0,240])
|
||||
# self.fig.canvas.draw()
|
||||
# plt.pause(0.01)
|
||||
|
||||
def update_lines(self):
|
||||
if self.filt:
|
||||
data_f = filtfilt(self.bf, self.af, self.data, axis=0)
|
||||
else:
|
||||
data_f = self.data
|
||||
data_f -= data_f.mean(axis=0)
|
||||
|
||||
for ii in range(self.n_chan):
|
||||
self.lines[ii].set_xdata(self.times[::subsample] -
|
||||
self.times[-1])
|
||||
self.lines[ii].set_ydata(data_f[::subsample, ii] /
|
||||
self.scale - ii)
|
||||
|
||||
impedances = np.std(data_f, axis=0)
|
||||
self.scale = impedances[0]
|
||||
ticks_labels = ['%s - %.2f' %
|
||||
(self.ch_names[ii], impedances[ii])
|
||||
for ii in range(self.n_chan)]
|
||||
self.axes[0].set_yticklabels(ticks_labels)
|
||||
self.axes[0].set_xlim(-self.window, 0)
|
||||
|
||||
|
||||
def update_data_and_plot(self):
|
||||
k = 0
|
||||
while self.started:
|
||||
samples, timestamps = self.inlet.pull_chunk(timeout=1.0,
|
||||
max_samples=buf)
|
||||
|
||||
if timestamps:
|
||||
self.data = np.vstack([self.data, samples])
|
||||
if self.dejitter:
|
||||
timestamps = np.float64(np.arange(len(timestamps)))
|
||||
timestamps /= self.sfreq
|
||||
timestamps += self.times[-1] + 1./self.sfreq
|
||||
self.times = np.concatenate([self.times, timestamps])
|
||||
|
||||
self.n_samples = int(self.sfreq * self.window)
|
||||
self.data = self.data[-self.n_samples:]
|
||||
self.times = self.times[-self.n_samples:]
|
||||
|
||||
|
||||
k += 1
|
||||
|
||||
if k >= self.display_every:
|
||||
self.update_lines()
|
||||
self.update_plot()
|
||||
self.fig.canvas.draw()
|
||||
plt.pause(0.01)
|
||||
|
||||
k = 0
|
||||
else:
|
||||
sleep(0.1)
|
||||
|
||||
def onclick(self, event):
|
||||
print((event.button, event.x, event.y, event.xdata, event.ydata))
|
||||
|
||||
def OnKeypress(self, event):
|
||||
if event.key == 'r':
|
||||
self.low = 10000
|
||||
self.high = 0
|
||||
elif event.key == '+':
|
||||
self.window += 1
|
||||
elif event.key == '-':
|
||||
if self.window > 1:
|
||||
self.window -= 1
|
||||
elif event.key == 'd':
|
||||
self.filt = not(self.filt)
|
||||
|
||||
def start(self):
|
||||
self.started = True
|
||||
self.thread = Thread(target=self.update_data_and_plot)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.started = False
|
||||
|
||||
|
||||
fig, axes = plt.subplots(1, 2, figsize=figsize, sharex=True)
|
||||
lslv = LSLViewer(streams[0], fig, axes, window, scale)
|
||||
|
||||
help_str = """
|
||||
reset scale: r
|
||||
increase time scale : -
|
||||
decrease time scale : +
|
||||
"""
|
||||
print(help_str)
|
||||
lslv.start()
|
||||
|
||||
plt.show()
|
||||
lslv.stop()
|
||||
Arquivo executável
+101
@@ -0,0 +1,101 @@
|
||||
#!/usr/bin/env python
|
||||
## code by Alexandre Barachant
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from time import time, strftime, gmtime
|
||||
from optparse import OptionParser
|
||||
from pylsl import StreamInlet, resolve_byprop
|
||||
from sklearn.linear_model import LinearRegression
|
||||
|
||||
default_fname = ("data/data_%s.csv" % strftime("%Y-%m-%d-%H.%M.%S", gmtime()))
|
||||
parser = OptionParser()
|
||||
parser.add_option("-d", "--duration",
|
||||
dest="duration", type='int', default=300,
|
||||
help="duration of the recording in seconds.")
|
||||
parser.add_option("-f", "--filename",
|
||||
dest="filename", type='str', default=default_fname,
|
||||
help="Name of the recording file.")
|
||||
|
||||
# dejitter timestamps
|
||||
dejitter = False
|
||||
|
||||
(options, args) = parser.parse_args()
|
||||
|
||||
|
||||
|
||||
print("looking for an EEG stream...")
|
||||
streams = resolve_byprop('type', 'EEG', timeout=2)
|
||||
|
||||
if len(streams) == 0:
|
||||
raise(RuntimeError, "Cant find EEG stream")
|
||||
|
||||
print("Start aquiring data")
|
||||
inlet = StreamInlet(streams[0], max_chunklen=12)
|
||||
eeg_time_correction = inlet.time_correction()
|
||||
|
||||
print("looking for a Markers stream...")
|
||||
marker_streams = resolve_byprop('type', 'Markers', timeout=2)
|
||||
|
||||
if marker_streams:
|
||||
inlet_marker = StreamInlet(marker_streams[0])
|
||||
marker_time_correction = inlet_marker.time_correction()
|
||||
else:
|
||||
inlet_marker = False
|
||||
print("Cant find Markers stream")
|
||||
|
||||
info = inlet.info()
|
||||
description = info.desc()
|
||||
|
||||
freq = info.nominal_srate()
|
||||
Nchan = info.channel_count()
|
||||
|
||||
ch = description.child('channels').first_child()
|
||||
ch_names = [ch.child_value('label')]
|
||||
for i in range(1, Nchan):
|
||||
ch = ch.next_sibling()
|
||||
ch_names.append(ch.child_value('label'))
|
||||
|
||||
res = []
|
||||
timestamps = []
|
||||
markers = []
|
||||
t_init = time()
|
||||
print('Start recording at time t=%.3f' % t_init)
|
||||
while (time() - t_init) < options.duration:
|
||||
try:
|
||||
data, timestamp = inlet.pull_chunk(timeout=1.0,
|
||||
max_samples=12)
|
||||
if timestamp:
|
||||
res.append(data)
|
||||
timestamps.extend(timestamp)
|
||||
if inlet_marker:
|
||||
marker, timestamp = inlet_marker.pull_sample(timeout=0.0)
|
||||
if timestamp:
|
||||
markers.append([marker, timestamp])
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
res = np.concatenate(res, axis=0)
|
||||
timestamps = np.array(timestamps)
|
||||
|
||||
if dejitter:
|
||||
y = timestamps
|
||||
X = np.atleast_2d(np.arange(0, len(y))).T
|
||||
lr = LinearRegression()
|
||||
lr.fit(X, y)
|
||||
timestamps = lr.predict(X)
|
||||
|
||||
res = np.c_[timestamps, res]
|
||||
data = pd.DataFrame(data=res, columns=['timestamps'] + ch_names)
|
||||
|
||||
data['Marker'] = 0
|
||||
# process markers:
|
||||
for marker in markers:
|
||||
# find index of margers
|
||||
ix = np.argmin(np.abs(marker[1] - timestamps))
|
||||
val = timestamps[ix]
|
||||
data.loc[ix, 'Marker'] = marker[0][0]
|
||||
|
||||
|
||||
data.to_csv(options.filename, float_format='%.3f', index=False)
|
||||
|
||||
print('Done !')
|
||||
Arquivo executável
+191
@@ -0,0 +1,191 @@
|
||||
#!/usr/bin/env python
|
||||
## code by Alexandre Barachant
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from scipy.signal import butter, filtfilt
|
||||
from time import time, sleep
|
||||
from pylsl import StreamInlet, resolve_byprop
|
||||
import seaborn as sns
|
||||
from threading import Thread
|
||||
|
||||
sns.set(style="whitegrid")
|
||||
|
||||
from optparse import OptionParser
|
||||
|
||||
parser = OptionParser()
|
||||
|
||||
parser.add_option("-w", "--window",
|
||||
dest="window", type='float', default=5.,
|
||||
help="window lenght to display in seconds.")
|
||||
parser.add_option("-s", "--scale",
|
||||
dest="scale", type='float', default=100,
|
||||
help="scale in uV")
|
||||
parser.add_option("-r", "--refresh",
|
||||
dest="refresh", type='float', default=0.2,
|
||||
help="refresh rate in seconds.")
|
||||
parser.add_option("-f", "--figure",
|
||||
dest="figure", type='string', default="15x6",
|
||||
help="window size.")
|
||||
|
||||
filt = True
|
||||
subsample = 2
|
||||
buf = 12
|
||||
|
||||
(options, args) = parser.parse_args()
|
||||
|
||||
window = options.window
|
||||
scale = options.scale
|
||||
figsize = np.int16(options.figure.split('x'))
|
||||
|
||||
print("looking for an EEG stream...")
|
||||
streams = resolve_byprop('type', 'EEG', timeout=2)
|
||||
|
||||
if len(streams) == 0:
|
||||
raise(RuntimeError("Cant find EEG stream"))
|
||||
print("Start aquiring data")
|
||||
|
||||
|
||||
class LSLViewer():
|
||||
|
||||
def __init__(self, stream, fig, axes, window, scale, dejitter=True):
|
||||
"""Init"""
|
||||
self.stream = stream
|
||||
self.window = window
|
||||
self.scale = scale
|
||||
self.dejitter = dejitter
|
||||
self.inlet = StreamInlet(stream, max_chunklen=buf)
|
||||
self.filt = True
|
||||
info = self.inlet.info()
|
||||
description = info.desc()
|
||||
|
||||
self.sfreq = info.nominal_srate()
|
||||
self.n_samples = int(self.sfreq * self.window)
|
||||
self.n_chan = info.channel_count()
|
||||
|
||||
ch = description.child('channels').first_child()
|
||||
ch_names = [ch.child_value('label')]
|
||||
|
||||
for i in range(self.n_chan):
|
||||
ch = ch.next_sibling()
|
||||
ch_names.append(ch.child_value('label'))
|
||||
|
||||
self.ch_names = ch_names
|
||||
|
||||
fig.canvas.mpl_connect('key_press_event', self.OnKeypress)
|
||||
fig.canvas.mpl_connect('button_press_event', self.onclick)
|
||||
|
||||
self.fig = fig
|
||||
self.axes = axes
|
||||
|
||||
sns.despine(left=True)
|
||||
|
||||
self.data = np.zeros((self.n_samples, self.n_chan))
|
||||
self.times = np.arange(-self.window, 0, 1./self.sfreq)
|
||||
impedances = np.std(self.data, axis=0)
|
||||
lines = []
|
||||
|
||||
for ii in range(self.n_chan):
|
||||
line, = axes.plot(self.times[::subsample],
|
||||
self.data[::subsample, ii] - ii, lw=1)
|
||||
lines.append(line)
|
||||
self.lines = lines
|
||||
|
||||
axes.set_ylim(-self.n_chan + 0.5, 0.5)
|
||||
ticks = np.arange(0, -self.n_chan, -1)
|
||||
|
||||
axes.set_xlabel('Time (s)')
|
||||
axes.xaxis.grid(False)
|
||||
axes.set_yticks(ticks)
|
||||
|
||||
ticks_labels = ['%s - %.1f' % (ch_names[ii], impedances[ii])
|
||||
for ii in range(self.n_chan)]
|
||||
axes.set_yticklabels(ticks_labels)
|
||||
|
||||
self.display_every = int(0.2 / (12/self.sfreq))
|
||||
|
||||
self.bf, self.af = butter(4, np.array([1, 40])/(self.sfreq/2.),
|
||||
'bandpass')
|
||||
|
||||
def update_plot(self):
|
||||
k = 0
|
||||
while self.started:
|
||||
samples, timestamps = self.inlet.pull_chunk(timeout=1.0,
|
||||
max_samples=12)
|
||||
if timestamps:
|
||||
self.data = np.vstack([self.data, samples])
|
||||
if self.dejitter:
|
||||
timestamps = np.float64(np.arange(len(timestamps)))
|
||||
timestamps /= self.sfreq
|
||||
timestamps += self.times[-1] + 1./self.sfreq
|
||||
self.times = np.concatenate([self.times, timestamps])
|
||||
|
||||
self.n_samples = int(self.sfreq * self.window)
|
||||
self.data = self.data[-self.n_samples:]
|
||||
self.times = self.times[-self.n_samples:]
|
||||
k += 1
|
||||
if k == self.display_every:
|
||||
if self.filt:
|
||||
data_f = filtfilt(self.bf, self.af, self.data, axis=0)
|
||||
else:
|
||||
data_f = self.data
|
||||
data_f -= data_f.mean(axis=0)
|
||||
|
||||
for ii in range(self.n_chan):
|
||||
self.lines[ii].set_xdata(self.times[::subsample] -
|
||||
self.times[-1])
|
||||
self.lines[ii].set_ydata(data_f[::subsample, ii] /
|
||||
self.scale - ii)
|
||||
|
||||
impedances = np.std(data_f, axis=0)
|
||||
ticks_labels = ['%s - %.2f' %
|
||||
(self.ch_names[ii], impedances[ii])
|
||||
for ii in range(self.n_chan)]
|
||||
self.axes.set_yticklabels(ticks_labels)
|
||||
self.axes.set_xlim(-self.window, 0)
|
||||
self.fig.canvas.draw()
|
||||
k = 0
|
||||
else:
|
||||
sleep(0.2)
|
||||
|
||||
def onclick(self, event):
|
||||
print((event.button, event.x, event.y, event.xdata, event.ydata))
|
||||
|
||||
def OnKeypress(self, event):
|
||||
if event.key == '/':
|
||||
self.scale *= 1.2
|
||||
elif event.key == '*':
|
||||
self.scale /= 1.2
|
||||
elif event.key == '+':
|
||||
self.window += 1
|
||||
elif event.key == '-':
|
||||
if self.window > 1:
|
||||
self.window -= 1
|
||||
elif event.key == 'd':
|
||||
self.filt = not(self.filt)
|
||||
|
||||
def start(self):
|
||||
self.started = True
|
||||
self.thread = Thread(target=self.update_plot)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.started = False
|
||||
|
||||
|
||||
fig, axes = plt.subplots(1, 1, figsize=figsize, sharex=True)
|
||||
lslv = LSLViewer(streams[0], fig, axes, window, scale)
|
||||
|
||||
help_str = """
|
||||
toggle filter : d
|
||||
toogle full screen : f
|
||||
zoom out : /
|
||||
zoom in : *
|
||||
increase time scale : -
|
||||
decrease time scale : +
|
||||
"""
|
||||
print(help_str)
|
||||
lslv.start()
|
||||
|
||||
plt.show()
|
||||
lslv.stop()
|
||||
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"name": "lab3",
|
||||
"version": "1.0.0",
|
||||
"description": "",
|
||||
"main": "ganglion-lsl.js",
|
||||
"dependencies": {
|
||||
"openbci-ganglion": "^0.4.3",
|
||||
"python-shell": "^0.4.0"
|
||||
},
|
||||
"devDependencies": {},
|
||||
"scripts": {
|
||||
"test": "echo \"Error: no test specified\" && exit 1"
|
||||
},
|
||||
"author": "",
|
||||
"license": "GPL-3.0"
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
pygame
|
||||
numpy
|
||||
scipy
|
||||
matplotlib
|
||||
pandas
|
||||
seaborn
|
||||
Referência em uma Nova Issue
Bloquear um usuário