Arquivos
2017-04-04 11:27:30 +02:00

612 linhas
19 KiB
Python

"""
Core OpenBCI object for handling connections and samples from the board.
EXAMPLE USE:
def handle_sample(sample):
print(sample.channel_data)
board = OpenBCIBoard()
board.print_register_settings()
board.start_streaming(handle_sample)
NOTE: If daisy modules is enabled, the callback will occur every two samples, hence "packet_id" will only contain even numbers. As a side effect, the sampling rate will be divided by 2.
FIXME: at the moment we can just force daisy mode, do not check that the module is detected.
TODO: enable impedance
"""
import serial
import struct
import numpy as np
import time
import timeit
import atexit
import logging
import threading
import sys
import pdb
import glob
SAMPLE_RATE = 250.0 # Hz
START_BYTE = 0xA0 # start of data packet
END_BYTE = 0xC0 # end of data packet
ADS1299_Vref = 4.5 #reference voltage for ADC in ADS1299. set by its hardware
ADS1299_gain = 24.0 #assumed gain setting for ADS1299. set by its Arduino code
scale_fac_uVolts_per_count = ADS1299_Vref/float((pow(2,23)-1))/ADS1299_gain*1000000.
scale_fac_accel_G_per_count = 0.002 /(pow(2,4)) #assume set to +/4G, so 2 mG
'''
#Commands for in SDK http://docs.openbci.com/software/01-Open BCI_SDK:
command_stop = "s";
command_startText = "x";
command_startBinary = "b";
command_startBinary_wAux = "n";
command_startBinary_4chan = "v";
command_activateFilters = "F";
command_deactivateFilters = "g";
command_deactivate_channel = {"1", "2", "3", "4", "5", "6", "7", "8"};
command_activate_channel = {"q", "w", "e", "r", "t", "y", "u", "i"};
command_activate_leadoffP_channel = {"!", "@", "#", "$", "%", "^", "&", "*"}; //shift + 1-8
command_deactivate_leadoffP_channel = {"Q", "W", "E", "R", "T", "Y", "U", "I"}; //letters (plus shift) right below 1-8
command_activate_leadoffN_channel = {"A", "S", "D", "F", "G", "H", "J", "K"}; //letters (plus shift) below the letters below 1-8
command_deactivate_leadoffN_channel = {"Z", "X", "C", "V", "B", "N", "M", "<"}; //letters (plus shift) below the letters below the letters below 1-8
command_biasAuto = "`";
command_biasFixed = "~";
'''
class OpenBCIBoard(object):
"""
Handle a connection to an OpenBCI board.
Args:
port: The port to connect to.
baud: The baud of the serial connection.
daisy: Enable or disable daisy module and 16 chans readings
aux, impedance: unused, for compatibility with ganglion API
"""
def __init__(self, port=None, baud=115200, filter_data=True,
scaled_output=True, daisy=False, aux=False, impedance=False, log=True, timeout=None):
self.log = log # print_incoming_text needs log
self.streaming = False
self.baudrate = baud
self.timeout = timeout
if not port:
port = self.find_port()
self.port = port
# might be handy to know API
self.board_type = "cyton"
print("Connecting to V3 at port %s" %(port))
self.ser = serial.Serial(port= port, baudrate = baud, timeout=timeout)
print("Serial established...")
time.sleep(2)
#Initialize 32-bit board, doesn't affect 8bit board
self.ser.write(b'v');
#wait for device to be ready
time.sleep(1)
self.print_incoming_text()
self.streaming = False
self.filtering_data = filter_data
self.scaling_output = scaled_output
self.eeg_channels_per_sample = 8 # number of EEG channels per sample *from the board*
self.aux_channels_per_sample = 3 # number of AUX channels per sample *from the board*
self.imp_channels_per_sample = 0 # impedance check not supported at the moment
self.read_state = 0
self.daisy = daisy
self.last_odd_sample = OpenBCISample(-1, [], []) # used for daisy
self.log_packet_count = 0
self.attempt_reconnect = False
self.last_reconnect = 0
self.reconnect_freq = 5
self.packets_dropped = 0
#Disconnects from board when terminated
atexit.register(self.disconnect)
def getBoardType(self):
""" Returns the version of the board """
return self.board_type
def setImpedance(self, flag):
""" Enable/disable impedance measure. Not implemented at the moment on Cyton. """
return
def ser_write(self, b):
"""Access serial port object for write"""
self.ser.write(b)
def ser_read(self):
"""Access serial port object for read"""
return self.ser.read()
def ser_inWaiting(self):
"""Access serial port object for inWaiting"""
return self.ser.inWaiting();
def getSampleRate(self):
if self.daisy:
return SAMPLE_RATE/2
else:
return SAMPLE_RATE
def getNbEEGChannels(self):
if self.daisy:
return self.eeg_channels_per_sample*2
else:
return self.eeg_channels_per_sample
def getNbAUXChannels(self):
return self.aux_channels_per_sample
def getNbImpChannels(self):
return self.imp_channels_per_sample
def start_streaming(self, callback, lapse=-1):
"""
Start handling streaming data from the board. Call a provided callback
for every single sample that is processed (every two samples with daisy module).
Args:
callback: A callback function -- or a list of functions -- that will receive a single argument of the
OpenBCISample object captured.
"""
if not self.streaming:
self.ser.write(b'b')
self.streaming = True
start_time = timeit.default_timer()
# Enclose callback funtion in a list if it comes alone
if not isinstance(callback, list):
callback = [callback]
#Initialize check connection
self.check_connection()
while self.streaming:
# read current sample
sample = self._read_serial_binary()
# if a daisy module is attached, wait to concatenate two samples (main board + daisy) before passing it to callback
if self.daisy:
# odd sample: daisy sample, save for later
if ~sample.id % 2:
self.last_odd_sample = sample
# even sample: concatenate and send if last sample was the fist part, otherwise drop the packet
elif sample.id - 1 == self.last_odd_sample.id:
# the aux data will be the average between the two samples, as the channel samples themselves have been averaged by the board
avg_aux_data = list((np.array(sample.aux_data) + np.array(self.last_odd_sample.aux_data))/2)
whole_sample = OpenBCISample(sample.id, sample.channel_data + self.last_odd_sample.channel_data, avg_aux_data)
for call in callback:
call(whole_sample)
else:
for call in callback:
call(sample)
if(lapse > 0 and timeit.default_timer() - start_time > lapse):
self.stop();
if self.log:
self.log_packet_count = self.log_packet_count + 1;
"""
PARSER:
Parses incoming data packet into OpenBCISample.
Incoming Packet Structure:
Start Byte(1)|Sample ID(1)|Channel Data(24)|Aux Data(6)|End Byte(1)
0xA0|0-255|8, 3-byte signed ints|3 2-byte signed ints|0xC0
"""
def _read_serial_binary(self, max_bytes_to_skip=3000):
def read(n):
bb = self.ser.read(n)
if not bb:
self.warn('Device appears to be stalled. Quitting...')
sys.exit()
raise Exception('Device Stalled')
sys.exit()
return '\xFF'
else:
return bb
for rep in range(max_bytes_to_skip):
#---------Start Byte & ID---------
if self.read_state == 0:
b = read(1)
if struct.unpack('B', b)[0] == START_BYTE:
if(rep != 0):
self.warn('Skipped %d bytes before start found' %(rep))
rep = 0;
packet_id = struct.unpack('B', read(1))[0] #packet id goes from 0-255
log_bytes_in = str(packet_id);
self.read_state = 1
#---------Channel Data---------
elif self.read_state == 1:
channel_data = []
for c in range(self.eeg_channels_per_sample):
#3 byte ints
literal_read = read(3)
unpacked = struct.unpack('3B', literal_read)
log_bytes_in = log_bytes_in + '|' + str(literal_read);
#3byte int in 2s compliment
if (unpacked[0] > 127):
pre_fix = bytes(bytearray.fromhex('FF'))
else:
pre_fix = bytes(bytearray.fromhex('00'))
literal_read = pre_fix + literal_read;
#unpack little endian(>) signed integer(i) (makes unpacking platform independent)
myInt = struct.unpack('>i', literal_read)[0]
if self.scaling_output:
channel_data.append(myInt*scale_fac_uVolts_per_count)
else:
channel_data.append(myInt)
self.read_state = 2;
#---------Accelerometer Data---------
elif self.read_state == 2:
aux_data = []
for a in range(self.aux_channels_per_sample):
#short = h
acc = struct.unpack('>h', read(2))[0]
log_bytes_in = log_bytes_in + '|' + str(acc);
if self.scaling_output:
aux_data.append(acc*scale_fac_accel_G_per_count)
else:
aux_data.append(acc)
self.read_state = 3;
#---------End Byte---------
elif self.read_state == 3:
val = struct.unpack('B', read(1))[0]
log_bytes_in = log_bytes_in + '|' + str(val);
self.read_state = 0 #read next packet
if (val == END_BYTE):
sample = OpenBCISample(packet_id, channel_data, aux_data)
self.packets_dropped = 0
return sample
else:
self.warn("ID:<%d> <Unexpected END_BYTE found <%s> instead of <%s>"
%(packet_id, val, END_BYTE))
logging.debug(log_bytes_in);
self.packets_dropped = self.packets_dropped + 1
"""
Clean Up (atexit)
"""
def stop(self):
print("Stopping streaming...\nWait for buffer to flush...")
self.streaming = False
self.ser.write(b's')
if self.log:
logging.warning('sent <s>: stopped streaming')
def disconnect(self):
if(self.streaming == True):
self.stop()
if (self.ser.isOpen()):
print("Closing Serial...")
self.ser.close()
logging.warning('serial closed')
"""
SETTINGS AND HELPERS
"""
def warn(self, text):
if self.log:
#log how many packets where sent succesfully in between warnings
if self.log_packet_count:
logging.info('Data packets received:'+str(self.log_packet_count))
self.log_packet_count = 0;
logging.warning(text)
print("Warning: %s" % text)
def print_incoming_text(self):
"""
When starting the connection, print all the debug data until
we get to a line with the end sequence '$$$'.
"""
line = ''
#Wait for device to send data
time.sleep(1)
if self.ser.inWaiting():
line = ''
c = ''
#Look for end sequence $$$
while '$$$' not in line:
c = self.ser.read().decode('utf-8', errors='replace') # we're supposed to get UTF8 text, but the board might behave otherwise
line += c
print(line);
else:
self.warn("No Message")
def openbci_id(self, serial):
"""
When automatically detecting port, parse the serial return for the "OpenBCI" ID.
"""
line = ''
#Wait for device to send data
time.sleep(2)
if serial.inWaiting():
line = ''
c = ''
#Look for end sequence $$$
while '$$$' not in line:
c = serial.read().decode('utf-8', errors='replace') # we're supposed to get UTF8 text, but the board might behave otherwise
line += c
if "OpenBCI" in line:
return True
return False
def print_register_settings(self):
self.ser.write(b'?')
time.sleep(0.5)
self.print_incoming_text();
#DEBBUGING: Prints individual incoming bytes
def print_bytes_in(self):
if not self.streaming:
self.ser.write(b'b')
self.streaming = True
while self.streaming:
print(struct.unpack('B',self.ser.read())[0]);
'''Incoming Packet Structure:
Start Byte(1)|Sample ID(1)|Channel Data(24)|Aux Data(6)|End Byte(1)
0xA0|0-255|8, 3-byte signed ints|3 2-byte signed ints|0xC0'''
def print_packets_in(self):
while self.streaming:
b = struct.unpack('B', self.ser.read())[0];
if b == START_BYTE:
self.attempt_reconnect = False
if skipped_str:
logging.debug('SKIPPED\n' + skipped_str + '\nSKIPPED')
skipped_str = ''
packet_str = "%03d"%(b) + '|';
b = struct.unpack('B', self.ser.read())[0];
packet_str = packet_str + "%03d"%(b) + '|';
#data channels
for i in range(24-1):
b = struct.unpack('B', self.ser.read())[0];
packet_str = packet_str + '.' + "%03d"%(b);
b = struct.unpack('B', self.ser.read())[0];
packet_str = packet_str + '.' + "%03d"%(b) + '|';
#aux channels
for i in range(6-1):
b = struct.unpack('B', self.ser.read())[0];
packet_str = packet_str + '.' + "%03d"%(b);
b = struct.unpack('B', self.ser.read())[0];
packet_str = packet_str + '.' + "%03d"%(b) + '|';
#end byte
b = struct.unpack('B', self.ser.read())[0];
#Valid Packet
if b == END_BYTE:
packet_str = packet_str + '.' + "%03d"%(b) + '|VAL';
print(packet_str)
#logging.debug(packet_str)
#Invalid Packet
else:
packet_str = packet_str + '.' + "%03d"%(b) + '|INV';
#Reset
self.attempt_reconnect = True
else:
print(b)
if b == END_BYTE:
skipped_str = skipped_str + '|END|'
else:
skipped_str = skipped_str + "%03d"%(b) + '.'
if self.attempt_reconnect and (timeit.default_timer()-self.last_reconnect) > self.reconnect_freq:
self.last_reconnect = timeit.default_timer()
self.warn('Reconnecting')
self.reconnect()
def check_connection(self, interval = 2, max_packets_to_skip=10):
# stop checking when we're no longer streaming
if not self.streaming:
return
#check number of dropped packages and establish connection problem if too large
if self.packets_dropped > max_packets_to_skip:
#if error, attempt to reconect
self.reconnect()
# check again again in 2 seconds
threading.Timer(interval, self.check_connection).start()
def reconnect(self):
self.packets_dropped = 0
self.warn('Reconnecting')
self.stop()
time.sleep(0.5)
self.ser.write(b'v')
time.sleep(0.5)
self.ser.write(b'b')
time.sleep(0.5)
self.streaming = True
#self.attempt_reconnect = False
#Adds a filter at 60hz to cancel out ambient electrical noise
def enable_filters(self):
self.ser.write(b'f')
self.filtering_data = True;
def disable_filters(self):
self.ser.write(b'g')
self.filtering_data = False;
def test_signal(self, signal):
""" Enable / disable test signal """
if signal == 0:
self.ser.write(b'0')
self.warn("Connecting all pins to ground")
elif signal == 1:
self.ser.write(b'p')
self.warn("Connecting all pins to Vcc")
elif signal == 2:
self.ser.write(b'-')
self.warn("Connecting pins to low frequency 1x amp signal")
elif signal == 3:
self.ser.write(b'=')
self.warn("Connecting pins to high frequency 1x amp signal")
elif signal == 4:
self.ser.write(b'[')
self.warn("Connecting pins to low frequency 2x amp signal")
elif signal == 5:
self.ser.write(b']')
self.warn("Connecting pins to high frequency 2x amp signal")
else:
self.warn("%s is not a known test signal. Valid signals go from 0-5" %(signal))
def set_channel(self, channel, toggle_position):
""" Enable / disable channels """
#Commands to set toggle to on position
if toggle_position == 1:
if channel is 1:
self.ser.write(b'!')
if channel is 2:
self.ser.write(b'@')
if channel is 3:
self.ser.write(b'#')
if channel is 4:
self.ser.write(b'$')
if channel is 5:
self.ser.write(b'%')
if channel is 6:
self.ser.write(b'^')
if channel is 7:
self.ser.write(b'&')
if channel is 8:
self.ser.write(b'*')
if channel is 9 and self.daisy:
self.ser.write(b'Q')
if channel is 10 and self.daisy:
self.ser.write(b'W')
if channel is 11 and self.daisy:
self.ser.write(b'E')
if channel is 12 and self.daisy:
self.ser.write(b'R')
if channel is 13 and self.daisy:
self.ser.write(b'T')
if channel is 14 and self.daisy:
self.ser.write(b'Y')
if channel is 15 and self.daisy:
self.ser.write(b'U')
if channel is 16 and self.daisy:
self.ser.write(b'I')
#Commands to set toggle to off position
elif toggle_position == 0:
if channel is 1:
self.ser.write(b'1')
if channel is 2:
self.ser.write(b'2')
if channel is 3:
self.ser.write(b'3')
if channel is 4:
self.ser.write(b'4')
if channel is 5:
self.ser.write(b'5')
if channel is 6:
self.ser.write(b'6')
if channel is 7:
self.ser.write(b'7')
if channel is 8:
self.ser.write(b'8')
if channel is 9 and self.daisy:
self.ser.write(b'q')
if channel is 10 and self.daisy:
self.ser.write(b'w')
if channel is 11 and self.daisy:
self.ser.write(b'e')
if channel is 12 and self.daisy:
self.ser.write(b'r')
if channel is 13 and self.daisy:
self.ser.write(b't')
if channel is 14 and self.daisy:
self.ser.write(b'y')
if channel is 15 and self.daisy:
self.ser.write(b'u')
if channel is 16 and self.daisy:
self.ser.write(b'i')
def find_port(self):
# Finds the serial port names
if sys.platform.startswith('win'):
ports = ['COM%s' % (i+1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
ports = glob.glob('/dev/ttyUSB*')
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/tty.usbserial*')
else:
raise EnvironmentError('Error finding ports on your operating system')
openbci_port = ''
for port in ports:
try:
s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout)
s.write(b'v')
openbci_serial = self.openbci_id(s)
s.close()
if openbci_serial:
openbci_port = port;
except (OSError, serial.SerialException):
pass
if openbci_port == '':
raise OSError('Cannot find OpenBCI port')
else:
return openbci_port
class OpenBCISample(object):
"""Object encapulsating a single sample from the OpenBCI board. NB: dummy imp for plugin compatiblity"""
def __init__(self, packet_id, channel_data, aux_data):
self.id = packet_id
self.channel_data = channel_data
self.aux_data = aux_data
self.imp_data = []