ADD: Wifi write function to pass through up to 30 ascii commands to the board at once - #51

Esse commit está contido em:
AJ Keller
2017-09-13 16:03:06 -04:00
commit fb54d1a203
2 arquivos alterados com 32 adições e 81 exclusões
+32 -75
Ver Arquivo
@@ -216,22 +216,23 @@ class OpenBCIWifi(object):
" with IPV4: " + list_ip[0])
return list_ip[0]
def ser_write(self, b):
"""Access serial port object for write"""
self.char_write.write(b)
def ser_read(self):
"""Access serial port object for read"""
return self.char_read.read()
def ser_inWaiting(self):
""" Slightly different from Cyton API, return True if ASCII messages are incoming."""
# FIXME: might have a slight problem with thread because of notifications...
if self.delegate.receiving_ASCII:
# in case the packet indicating the end of the message drops, we use a 1s timeout
if timeit.default_timer() - self.delegate.time_last_ASCII > 2:
self.delegate.receiving_ASCII = False
return self.delegate.receiving_ASCII
def wifi_write(self, output):
"""
Pass through commands from the WiFi Shield to the Carrier board
:param output:
:return:
"""
res_command_post = requests.post("http://%s/command" % self.ip_address,
json={'command': output})
if res_command_post.status_code == 200:
ret_val = res_command_post.text
if self.log:
print(ret_val)
return ret_val
else:
if self.log:
print("Error code: %d %s" % (res_command_post.status_code, res_command_post.text))
raise RuntimeError("Error code: %d %s" % (res_command_post.status_code, res_command_post.text))
def getSampleRate(self):
return SAMPLE_RATE
@@ -282,26 +283,22 @@ class OpenBCIWifi(object):
# # Checking connection -- timeout and packets dropped
# self.check_connection()
def waitForNotifications(self, delay):
""" Allow some time for the board to receive new data. """
self.gang.waitForNotifications(delay)
def test_signal(self, signal):
""" Enable / disable test signal """
if signal == 0:
self.warn("Disabling synthetic square wave")
try:
self.char_write.write(b']')
self.wifi_write(']')
except Exception as e:
print("Something went wrong while setting signal: " + str(e))
elif signal == 1:
self.warn("Eisabling synthetic square wave")
try:
self.char_write.write(b'[')
self.wifi_write('[')
except Exception as e:
print("Something went wrong while setting signal: " + str(e))
else:
self.warn("%s is not a known test signal. Valid signal is 0-1" % (signal))
self.warn("%s is not a known test signal. Valid signal is 0-1" % signal)
def set_channel(self, channel, toggle_position):
""" Enable / disable channels """
@@ -309,23 +306,23 @@ class OpenBCIWifi(object):
# Commands to set toggle to on position
if toggle_position == 1:
if channel is 1:
self.ser.write(b'!')
self.wifi_write('!')
if channel is 2:
self.ser.write(b'@')
self.wifi_write('@')
if channel is 3:
self.ser.write(b'#')
self.wifi_write('#')
if channel is 4:
self.ser.write(b'$')
self.wifi_write('$')
# Commands to set toggle to off position
elif toggle_position == 0:
if channel is 1:
self.ser.write(b'1')
self.wifi_write('1')
if channel is 2:
self.ser.write(b'2')
self.wifi_write('2')
if channel is 3:
self.ser.write(b'3')
self.wifi_write('3')
if channel is 4:
self.ser.write(b'4')
self.wifi_write('4')
except Exception as e:
print("Something went wrong while setting channels: " + str(e))
@@ -342,28 +339,19 @@ class OpenBCIWifi(object):
try:
if self.impedance:
print("Stopping with impedance testing")
self.ser_write(b'Z')
self.wifi_write('Z')
else:
self.ser_write(b's')
self.wifi_write('s')
except Exception as e:
print("Something went wrong while asking the board to stop streaming: " + str(e))
if self.log:
logging.warning('sent <s>: stopped streaming')
def disconnect(self):
if (self.streaming == True):
if self.streaming:
self.stop()
print("Closing BLE..")
try:
self.char_discon.write(b' ')
except Exception as e:
print("Something went wrong while asking the board to disconnect: " + str(e))
# should not try to read/write anything after that, will crash
try:
self.gang.disconnect()
except Exception as e:
print("Something went wrong while shutting down BLE link: " + str(e))
logging.warning('BLE closed')
"""
@@ -413,37 +401,6 @@ class OpenBCISample(object):
self.imp_data = imp_data
def conv24bitsToInt(unpacked):
""" Convert 24bit data coded on 3 bytes to a proper integer """
if len(unpacked) != 3:
raise ValueError("Input should be 3 bytes long.")
# FIXME: quick'n dirty, unpack wants strings later on
literal_read = struct.pack('3B', unpacked[0], unpacked[1], unpacked[2])
# 3byte int in 2s compliment
if (unpacked[0] > 127):
pre_fix = bytes(bytearray.fromhex('FF'))
else:
pre_fix = bytes(bytearray.fromhex('00'))
literal_read = pre_fix + literal_read
# unpack little endian(>) signed integer(i) (makes unpacking platform independent)
myInt = struct.unpack('>i', literal_read)[0]
return myInt
def conv8bitToInt8(byte):
""" Convert one byte to signed value """
if byte > 127:
return (256 - byte) * (-1)
else:
return byte
class WiFiShieldHandler(asyncore.dispatcher_with_send):
def handle_read(self):
-6
Ver Arquivo
@@ -16,16 +16,10 @@ def printData(sample):
if __name__ == '__main__':
shield_name = 'OpenBCI-E2B6'
#port = '/dev/tty.OpenBCI-DN0096XA'
# baud = 115200
logging.basicConfig(filename="test.log",format='%(asctime)s - %(levelname)s : %(message)s',level=logging.DEBUG)
logging.info('---------LOG START-------------')
shield = bci.OpenBCIWifi(shield_name=shield_name, log=True)
print("WiFi Shield Instantiated")
# shield.stream_start()
# board.ser.write('v')
# time.sleep(10)
shield.start_streaming(printData)
shield.loop()
# board.print_bytes_in()