Merge pull request #1 from OpenBCI/master

Update
Esse commit está contido em:
Alejandro Azocar
2015-02-12 10:13:44 -06:00
6 arquivos alterados com 441 adições e 79 exclusões
+60
Ver Arquivo
@@ -0,0 +1,60 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Vim temp files
.*.swp
# CSV
*.csv
+137
Ver Arquivo
@@ -2,3 +2,140 @@ OpenBCI_Python
==============
The Python software library designed to work with OpenBCI hardware.
Please direct any questions, suggestions and bug reports to the github repo at: https://github.com/OpenBCI/OpenBCI_Python
##Dependancies:
Python 2.7 or later (https://www.python.org/download/releases/2.7/)
Numpy 1.7 or later (http://www.numpy.org/)
OpenBCI 8 and 32 bit board with 8 channels.
This library includes the main open_bci_v3 class definition that instantiates an OpenBCI Board object. This object will initialize communication with the board and get the environment ready for data streaming. This library is designed to work with iOS and Linux distributions. To use a Windows OS, change the __init__ function in open_bci_v3.py to establish a serial connection in Windows.
For additional details on connecting your board visit: http://docs.openbci.com/tutorials/01-GettingStarted
##Audience:
This python code is meant to be used by people familiar with python and programming in general. It's purpose is to allow for programmers to interface with OpenBCI technology directly, both to acquire data and to write programs that can use that data on a live setting, using python.
If this is not what you are looking for, you can visit http://openbci.com/downloads and browse other OpenBCI software that will fit your needs.
##Functionality:
The startStreaming function of the Board object takes a callback function and begins streaming data from the board. Each packet it receives is then parsed as an OpenBCISample which is passed to the callback function as an argument.
OpenBCISample members:
-id:
int from 0-255. Used to tell if packets were skipped.
-channel_data:
8 int array with current voltage value of each channel (1-8)
-aux_data:
3 int array with current auxiliary data. (0s by default)
###User.py
For initial testing, this code provides a simple user interface (called user.py). To use it, connect the board to your computer using the dongle (see http://docs.openbci.com/tutorials/01-GettingStarted for details).
Then simply run the code given as an argument the port your board is connected to:
Ex Linux:
> $python user.py -p /dev/ttyUSB0
The program should establish a serial connection and reset the board to default settings. When a '-->' appears, you can type a character (character map http://docs.openbci.com/software/01-OpenBCI_SDK) that will be sent to the board using ser.write. This allows you to change the settings on the board.
A good first test is to try is to type '?':
>--> ?
This should output the current configuration settings on the board.
Another test would be to change the board settings so that all the pins in the board are internally connected to a test (square) wave. To do this, type:
>--> [
Alternatively, there are 6 test signals pre configured:
> --> /test1 (connect all pins to ground)
> --> /test2 (connect all pins to vcc)
> --> /test3 (Connecting pins to low frequency 1x amp signal)
> --> /test4 (Connecting pins to high frequency 1x amp signal)
> --> /test5 (Connecting pins to low frequency 2x amp signal)
> --> /test6 (Connecting pins to high frequency 2x amp signal)
The / is used in the interface to execute a pre-configured command. Writing anything without a preceding '/' will automatically write those characters, one by one, to the board.
For example, writing
> -->x3020000X
will do the following:
x enters Channel Settings mode. Channel 3 is set up to be powered up, with gain of 2, normal input, removed from BIAS generation, removed from SRB2, removed from SRB1. The final X latches the settings to the ADS1299 channel settings register.
Pre-configured commands that use the / prefix are:
test (As explained above)
> --> /test4
csv (Set the start command to record data to a CSV file)
> --> /csv
start (Start EEG streaming using the most recently defined callback, printData by default)
> --> /start
Adding the argument "T:number" will set a timeout on the start command.
For example, to record CSV data for 5 seconds type:
>-->/csv
>-->/start T:5
To use your own function as a callback just define your function and substitute in line 31 like so:
```python
fun = yourFunction()
```
#### Useful commands:
Writting to SD card a high frequency square wave (test5) for 3 seconds:
```
$ python user.py -p /dev/ttyUSB0
User serial interface enabled...
Connecting to /dev/ttyUSB0
Serial established...
View command map at http://docs.openbci.com.
Type start to run. Type exit to exit.
-->
OpenBCI V3 8bit Board
Setting ADS1299 Channel Values
ADS1299 Device ID: 0x3E
LIS3DH Device ID: 0x33
Free RAM: 447
$$$
--> /test5
Warning: Connecting pins to high frequency 2x amp signal
--> a
Corresponding SD file OBCI_18.TXT$$$
--> /start T:3
```
NOTES:
When writing to the board and expecting a response, give the board a second. It sometimes lags and requires
the user to hit enter on the user.py script until you get a response.
+39
Ver Arquivo
@@ -0,0 +1,39 @@
import csv
import time
class csv_collect(object):
def __init__(self, file_name="collect.csv", delim = ","):
self.file_name = file_name
self.start_time = time.time()
self.delim = delim
open(self.file_name, 'w').close()
def __call__(self, sample):
t = time.time() - self.start_time
#print timeSinceStart|Sample Id
print("%f | %d" %(t,sample.id))
row = ''
row += str(t)
row += self.delim
row += str(sample.id)
row += self.delim
for i in sample.channel_data:
row += str(i)
row += self.delim
for i in sample.aux_data:
row += str(i)
row += self.delim
#remove last comma
row += '\n'
with open(self.file_name, 'a') as f:
f.write(row)
+111 -77
Ver Arquivo
@@ -16,6 +16,7 @@ board.start(handle_sample)
import serial
import struct
import numpy as np
import time
SAMPLE_RATE = 250.0 # Hz
START_BYTE = bytes(0xA0) # start of data packet
@@ -40,7 +41,7 @@ scale_fac_uVolts_per_count = ADS1299_Vref/(pow(2,23)-1)/(ADS1299_gain*1000000.);
# command_biasAuto = "`";
# command_biasFixed = "~";
class OpenBCIBoard(object):
"""
@@ -52,61 +53,60 @@ class OpenBCIBoard(object):
"""
def __init__(self, port=None, baud=115200, filter_data=True):
def __init__(self, port=None, baud=115200, filter_data=True,
scaled_output=True):
if not port:
port = find_port()
if not port:
raise OSError('Cannot find OpenBCI port')
self.ser = serial.Serial(port, baud)
self.dump_registry_data()
print("Serial established...")
#Initialize 32-bit board, doesn't affect 8bit board
self.ser.write('v');
#wait for device to be ready
time.sleep(1)
self.print_incoming_text()
self.streaming = False
self.filtering_data = filter_data
self.scaling_output = scaled_output
self.channels = 8
self.read_state = 0;
# Searching for start(0), sample_count(1),read data(2), read aux(3), search last(4)
def printIn(self):
def printBytesIn(self):
#DEBBUGING: Prints individual incoming bytes
if not self.streaming:
# if self.filtering_data:
# self.warn('Enabling filter')
# self.ser.write('F')
# print(self.ser.readline())
# Send an 'b' to the board to tell it to start streaming us text.
self.ser.write('b')
# Dump the first line that says "Arduino: Starting..."
self.streaming = True
while self.streaming:
print(struct.unpack('B',self.ser.read())[0]);
def start(self, callback):
def startStreaming(self, callback, lapse=-1):
"""
Start handling streaming data from the board. Call a provided callback
for every single sample that is processed.
Args:
callback: A callback function that will receive a single argument of the
OpenBCISample object captured.
"""
if not self.streaming:
# if self.filtering_data:
# self.warn('Enabling filter')
# self.ser.write('F')
# print(self.ser.readline())
# Send an 'b' to the board to tell it to start streaming us text.
self.ser.write('b')
# Dump the first line that says "Arduino: Starting..."
self.ser.readline()
self.streaming = True
start_time = time.time()
while self.streaming:
#data = self.ser.readline()
sample = self._read_serial_binary()
callback(sample)
if(lapse > 0 and time.time() - start_time > lapse):
self.streaming = False
#If exited, stop streaming
self.ser.write('s')
"""
@@ -114,40 +114,50 @@ class OpenBCIBoard(object):
"""
def stop(self):
self.warn("Stopping streaming")
self.streaming = False
def disconnect(self):
self.stop()
self.warn("Closing Serial")
self.ser.close()
self.streaming = False
"""
SETTINGS AND HELPERS
"""
SETTINGS AND HELPERS
"""
def dump_registry_data(self):
def print_incoming_text(self):
"""
When starting the connection, dump all the debug data until
we get to a line with something about streaming data.
When starting the connection, print all the debug data until
we get to a line with the end sequence '$$$'.
"""
line = ''
while 'begin streaming data' not in line:
line = self.ser.readline()
#Wait for device to send data
time.sleep(0.5)
if self.ser.inWaiting():
print("-------------------")
line = ''
c = ''
#Look for end sequence $$$
while '$$$' not in line:
c = self.ser.read()
line += c
print(line);
print("-------------------\n")
def print_register_settings(self):
self.ser.write('?')
for number in xrange(0, 24):
print(self.ser.readline())
time.sleep(0.5)
print_incoming_text();
"""
Adds a filter at 60hz to cancel out ambient electrical noise.
"""
def enable_filters(self):
self.ser.write('f')
@@ -158,15 +168,24 @@ class OpenBCIBoard(object):
self.filtering_data = False;
def warn(self, text):
print(text)
print("Warning: %s" % text)
"""
Parses incoming data packet into OpenBCISample.
Incoming Packet Structure:
Start Byte(1)|Sample ID(1)|Channel Data(24)|Aux Data(6)|End Byte(1)
0xA0|0-255|8, 3-byte signed ints|3 2-byte signed ints|0xC0
"""
def _read_serial_binary(self, max_bytes_to_skip=3000):
def read(n):
b = self.ser.read(n)
# print bytes(b)ar
# print bytes(b)
return b
for rep in xrange(max_bytes_to_skip):
#Looking for start and save id when found
if self.read_state == 0:
b = read(1)
@@ -180,38 +199,36 @@ class OpenBCIBoard(object):
if(rep != 0):
self.warn('Skipped %d bytes before start found' %(rep))
packet_id = struct.unpack('B', read(1))[0] #packet id goes from 0-255
self.read_state = 1
#CHECK THIS
elif self.read_state == 1:
channel_data = []
for c in xrange(self.channels):
#3 byte ints
literal_read = read(3)
unpacked = struct.unpack('3B', literal_read)
#3byte int in 2s compliment
if (unpacked[0] >= 127):
if (unpacked[0] >= 127):
pre_fix = '\xFF'
else:
pre_fix = '\x00'
literal_read = pre_fix + literal_read;
literal_read = pre_fix + literal_read;
#unpack little endian(>) signed integer(i)
#also makes unpacking platform independent
myInt = struct.unpack('>i', literal_read)
myInt = struct.unpack('>i', literal_read)[0]
if self.scaling_output:
channel_data.append(myInt*scale_fac_uVolts_per_count)
else:
channel_data.append(myInt)
channel_data.append(myInt[0]*scale_fac_uVolts_per_count)
# # Debug
# unpacked_final = struct.unpack('4B', literal_read)
# print unpacked
# print unpacked_final
# print myInt
self.read_state = 2;
@@ -219,12 +236,11 @@ class OpenBCIBoard(object):
aux_data = []
for a in xrange(3):
#short(h)
#short(h)
acc = struct.unpack('h', read(2))[0]
aux_data.append(acc)
self.read_state = 3;
self.read_state = 3;
elif self.read_state == 3:
val = bytes(struct.unpack('B', read(1))[0])
@@ -234,34 +250,52 @@ class OpenBCIBoard(object):
return sample
else:
self.warn("Warning: Unexpected END_BYTE found <%s> instead of <%s>,\
discarted packet with id <%d>"
discarted packet with id <%d>"
%(val, END_BYTE, packet_id))
def _interprate_stream(self, b):
print ("interprate")
def test_signal(self, signal):
if signal == 0:
self.ser.write('0')
self.warn("Connecting all pins to ground")
elif signal == 1:
self.ser.write('p')
self.warn("Connecting all pins to Vcc")
elif signal == 2:
self.ser.write('-')
self.warn("Connecting pins to low frequency 1x amp signal")
elif signal == 3:
self.ser.write('=')
self.warn("Connecting pins to high frequency 1x amp signal")
elif signal == 4:
self.ser.write('[')
self.warn("Connecting pins to low frequency 2x amp signal")
elif signal == 5:
self.ser.write(']')
self.warn("Connecting pins to high frequency 2x amp signal")
else:
self.warn("%s is not a known test signal. Valid signals go from 0-5" %(signal))
def set_channel(self, channel, toggle_position):
#Commands to set toggle to on position
if toggle_position == 1:
if toggle_position == 1:
if channel is 1:
self.ser.write('q')
self.ser.write('!')
if channel is 2:
self.ser.write('w')
self.ser.write('@')
if channel is 3:
self.ser.write('e')
self.ser.write('#')
if channel is 4:
self.ser.write('r')
self.ser.write('$')
if channel is 5:
self.ser.write('t')
self.ser.write('%')
if channel is 6:
self.ser.write('y')
self.ser.write('^')
if channel is 7:
self.ser.write('u')
self.ser.write('&')
if channel is 8:
self.ser.write('i')
self.ser.write('*')
#Commands to set toggle to off position
elif toggle_position == 0:
elif toggle_position == 0:
if channel is 1:
self.ser.write('1')
if channel is 2:
@@ -284,8 +318,8 @@ class OpenBCISample(object):
"""Object encapulsating a single sample from the OpenBCI board."""
def __init__(self, packet_id, channel_data, aux_data):
self.id = packet_id;
self.channels = channel_data;
self.channel_data = channel_data;
self.aux_data = aux_data;
+2 -2
Ver Arquivo
@@ -5,7 +5,7 @@ def printData(sample):
#os.system('clear')
print "----------------"
print("%f" %(sample.id))
print sample.channels
print sample.channel_data
print sample.aux_data
print "----------------"
@@ -15,4 +15,4 @@ if __name__ == '__main__':
port = '/dev/ttyUSB0'
baud = 115200
board = bci.OpenBCIBoard(port=port)
board.start(printData)
board.startStreaming(printData)
+92
Ver Arquivo
@@ -0,0 +1,92 @@
#!/usr/bin/env python2.7
import argparse # new in Python2.7
import open_bci_v3 as bci
import os
import time
import csv_collect
import string
def printData(sample):
#os.system('clear')
print "----------------"
print("%f" %(sample.id))
print sample.channel_data
print sample.aux_data
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="OpenBCI 'user'")
parser.add_argument('-p', '--port', required=True,
help="Port to connect to OpenBCI Dongle " +
"( ex /dev/ttyUSB0 or /dev/tty.usbserial-* )")
# baud rate is not currently used
parser.add_argument('-b', '--baud', default=115200, type=int,
help="Baud rate (not currently used)")
parser.add_argument('-c', '--cvs', action="store_true",
help="write cvs data")
args = parser.parse_args()
if args.cvs:
fun = csv_collect.csv_collect()
else:
fun = printData
print "User serial interface enabled..."
print "Connecting to ", args.port
board = bci.OpenBCIBoard(port=args.port)
print "View command map at http://docs.openbci.com."
print "Type start to run. Type exit to exit."
#Start by restoring default settings
s = 'd'
while(s != "exit"):
#Send char and wait for registers to set
if (not s): pass
elif("help" in s): print "View command map at: \
http://docs.openbci.com/software/01-OpenBCI_SDK.\n\
For user interface, read README or view \
https://github.com/OpenBCI/OpenBCI_Python"
elif('/' == s[0]):
s = s[1:]
if("T:" in s):
lapse = int(s[string.find(s,"T:")+2:])
else:
lapse = -1
if("start" in s):
board.startStreaming(fun, lapse)
elif(s == 'csv'):
print("/start will run csv_collect")
fun = csv_collect.csv_collect()
elif('test' in s):
test = int(s[string.find(s,"test")+4:])
board.test_signal(test)
elif s:
for c in s:
board.ser.write(c)
time.sleep(0.035)
line = ''
time.sleep(0.1) #Wait to see if the board has anything to report
while board.ser.inWaiting():
c = board.ser.read()
line += c
time.sleep(0.001)
if (c == '\n'):
print(line[:-1])
line = ''
print(line)
#Take user input
s = raw_input('--> ');
board.disconnect()