FIX: Streaming json data over wifi - #51

Esse commit está contido em:
AJ Keller
2017-09-13 15:48:50 -04:00
commit ec291ecceb
2 arquivos alterados com 101 adições e 115 exclusões
+99 -115
Ver Arquivo
@@ -29,6 +29,7 @@ import xmltodict
import re
import asyncore
import socket
import requests
SAMPLE_RATE = 0 # Hz
@@ -56,7 +57,7 @@ class OpenBCIWifi(object):
"""
def __init__(self, ip_address=None, shield_name=None, sample_rate=None, log=True, timeout=2,
max_packets_to_skip=20):
max_packets_to_skip=20, latency=10000):
# these one are used
self.log = log # print_incoming_text needs log
self.streaming = False
@@ -66,38 +67,42 @@ class OpenBCIWifi(object):
self.ip_address = ip_address
self.shield_name = shield_name
self.sample_rate = sample_rate
self.latency = latency
# might be handy to know API
self.board_type = "none"
if self.log:
print("Welcome to OpenBCI Native WiFi Shield Driver - Please contribute code!")
if ip_address is None:
self.ip_address = self.find_wifi_shield()
self.local_ip_address = self._get_local_ip_address()
# Intentionally bind to port 0
self.local_wifi_server = WiFiShieldServer(self.local_ip_address, 0)
asyncore.loop()
if self.log:
print("Opened socket on %s:%d" % (self.local_ip_address, self.local_wifi_server.getsockname()[1]))
self.connect()
self.streaming = False
# number of EEG channels and (optionally) accelerometer channel
self.eeg_channels_per_sample = 4
self.aux_channels_per_sample = 3
self.imp_channels_per_sample = 5
# number of EEG channels
self.eeg_channels_per_sample = 0
self.read_state = 0
self.log_packet_count = 0
self.packets_dropped = 0
self.time_last_packet = 0
if self.log:
print("Welcome to OpenBCI Native WiFi Shield Driver - Please contribute code!")
self.local_ip_address = self._get_local_ip_address()
# Intentionally bind to port 0
self.local_wifi_server = WiFiShieldServer(self.local_ip_address, 0)
self.local_wifi_server_port = self.local_wifi_server.getsockname()[1]
if self.log:
print("Opened socket on %s:%d" % (self.local_ip_address, self.local_wifi_server_port))
if ip_address is None:
self.find_wifi_shield(wifi_shield_cb=self.on_shield_found)
else:
self.on_shield_found(ip_address)
def on_shield_found(self, ip_address):
self.ip_address = ip_address
self.connect()
# Disconnects from board when terminated
atexit.register(self.disconnect)
def loop(self):
asyncore.loop()
def _get_local_ip_address(self):
"""
Gets the local ip address of this computer
@@ -125,68 +130,54 @@ class OpenBCIWifi(object):
if self.log:
print ("Init WiFi connection with IP: " + self.ip_address)
"""
Docs on these HTTP requests and more are found:
https://app.swaggerhub.com/apis/pushtheworld/openbci-wifi-server/1.3.0
"""
self.gang = Peripheral(self.port, 'random') # ADDR_TYPE_RANDOM
res_board = requests.get("http://%s/board" % self.ip_address)
print ("Get mainservice...")
self.service = self.gang.getServiceByUUID(BLE_SERVICE)
print ("Got:" + str(self.service))
if res_board.status_code == 200:
board_info = res_board.json()
if not board_info['board_connected']:
raise RuntimeError("No board connected to WiFi Shield. Learn at docs.openbci.com")
self.board_type = board_info['board_type']
self.eeg_channels_per_sample = board_info['num_channels']
if self.log:
print("Connected to %s with %s channels" % (self.board_type, self.eeg_channels_per_sample))
print ("Get characteristics...")
self.char_read = self.service.getCharacteristics(BLE_CHAR_RECEIVE)[0]
print ("receive, properties: " + str(self.char_read.propertiesToString()) + ", supports read: " + str(
self.char_read.supportsRead()))
self.char_write = self.service.getCharacteristics(BLE_CHAR_SEND)[0]
print ("write, properties: " + str(self.char_write.propertiesToString()) + ", supports read: " + str(
self.char_write.supportsRead()))
self.char_discon = self.service.getCharacteristics(BLE_CHAR_DISCONNECT)[0]
print ("disconnect, properties: " + str(self.char_discon.propertiesToString()) + ", supports read: " + str(
self.char_discon.supportsRead()))
# set delegate to handle incoming data
# self.delegate = GanglionDelegate(self.scaling_output)
# self.gang.setDelegate(self.delegate)
# enable AUX channel
if self.aux:
print("Enabling AUX data...")
try:
self.ser_write(b'n')
except Exception as e:
print("Something went wrong while enabling aux channels: " + str(e))
print("Turn on notifications")
# nead up-to-date bluepy, cf https://github.com/IanHarvey/bluepy/issues/53
self.desc_notify = self.char_read.getDescriptors(forUUID=0x2902)[0]
try:
self.desc_notify.write(b"\x01")
except Exception as e:
print("Something went wrong while trying to enable notification: " + str(e))
print("Connection established")
res_tcp_post = requests.post("http://%s/tcp" % self.ip_address,
json={
'ip': self.local_ip_address,
'port': self.local_wifi_server_port,
'output': 'json',
'delimiter': True,
'latency': self.latency
})
if res_tcp_post.status_code == 200:
tcp_status = res_tcp_post.json()
if tcp_status['connected']:
if self.log:
print("WiFi Shield to Python TCP Socket Established")
else:
raise RuntimeWarning("WiFi Shield is not able to connect to local server. Please open an issue.")
def init_streaming(self):
""" Tell the board to record like crazy. """
try:
if self.impedance:
print("Starting with impedance testing")
self.ser_write(b'z')
else:
self.ser_write(b'b')
except Exception as e:
print("Something went wrong while asking the board to start streaming: " + str(e))
self.streaming = True
self.packets_dropped = 0
self.time_last_packet = timeit.default_timer()
res_stream_start = requests.get("http://%s/stream/start" % self.ip_address)
if res_stream_start.status_code == 200:
self.streaming = True
self.packets_dropped = 0
self.time_last_packet = timeit.default_timer()
else:
raise EnvironmentError("Unable to start streaming. Check API for status code %d on /stream/start" % res_stream_start.status_code)
def find_wifi_shield(self, shield_name=None):
def find_wifi_shield(self, shield_name=None, wifi_shield_cb=None):
"""Detects Ganglion board MAC address -- if more than 1 around, will select first. Needs root privilege."""
print("Try to find WiFi shields on your local wireless network")
scan_time = 5
print("Scanning for 5 seconds nearby devices...")
if self.log:
print("Try to find WiFi shields on your local wireless network")
print("Scanning for 5 seconds nearby devices...")
list_ip = []
list_id = []
@@ -200,21 +191,22 @@ class OpenBCIWifi(object):
cur_ip_address = re.findall(r'[0-9]+(?:\.[0-9]+){3}', cur_base_url)[0]
list_id.append(cur_shield_name)
list_ip.append(cur_ip_address)
if shield_name is not None:
found_shield = True
if shield_name is None:
print("Found WiFi Shield %s with IP Address %s" % (cur_shield_name, cur_ip_address))
if wifi_shield_cb is not None:
wifi_shield_cb(cur_ip_address)
else:
if shield_name == cur_shield_name:
found_shield = True
return cur_ip_address
if wifi_shield_cb is not None:
wifi_shield_cb(cur_ip_address)
ssdp_hits = ssdp.discover("urn:schemas-upnp-org:device:Basic:1", timeout=3, wifi_found_cb=wifi_shield_found)
nb_wifi_shields = 0
if not found_shield:
nb_wifi_shields = len(list_id)
else:
nb_wifi_shields = 1
nb_wifi_shields = len(list_id)
if nb_wifi_shields < 1:
print("No WiFi Shield found ;(")
print("No WiFi Shields found ;(")
raise OSError('Cannot find OpenBCI WiFi Shield with local name')
if nb_wifi_shields > 1:
@@ -248,14 +240,6 @@ class OpenBCIWifi(object):
"""Will not get new data on impedance check."""
return self.eeg_channels_per_sample
def getNbAUXChannels(self):
"""Might not be used depending on the mode."""
return self.aux_channels_per_sample
def getNbImpChannels(self):
"""Might not be used depending on the mode."""
return self.imp_channels_per_sample
def start_streaming(self, callback, lapse=-1):
"""
Start handling streaming data from the board. Call a provided callback
@@ -274,29 +258,29 @@ class OpenBCIWifi(object):
if not isinstance(callback, list):
callback = [callback]
while self.streaming:
# should the board get disconnected and we could not wait for notification anymore, a reco should be attempted through timeout mechanism
try:
# at most we will get one sample per packet
self.waitForNotifications(1. / self.getSampleRate())
except Exception as e:
print("Something went wrong while waiting for a new sample: " + str(e))
# retrieve current samples on the stack
samples = self.delegate.getSamples()
self.packets_dropped = self.delegate.getMaxPacketsDropped()
if samples:
self.time_last_packet = timeit.default_timer()
for call in callback:
for sample in samples:
call(sample)
if (lapse > 0 and timeit.default_timer() - start_time > lapse):
self.stop();
if self.log:
self.log_packet_count = self.log_packet_count + 1;
# Checking connection -- timeout and packets dropped
self.check_connection()
# while self.streaming:
# # should the board get disconnected and we could not wait for notification anymore, a reco should be attempted through timeout mechanism
# try:
# # at most we will get one sample per packet
# self.waitForNotifications(1. / self.getSampleRate())
# except Exception as e:
# print("Something went wrong while waiting for a new sample: " + str(e))
# # retrieve current samples on the stack
# samples = self.delegate.getSamples()
# self.packets_dropped = self.delegate.getMaxPacketsDropped()
# if samples:
# self.time_last_packet = timeit.default_timer()
# for call in callback:
# for sample in samples:
# call(sample)
#
# if (lapse > 0 and timeit.default_timer() - start_time > lapse):
# self.stop();
# if self.log:
# self.log_packet_count = self.log_packet_count + 1;
#
# # Checking connection -- timeout and packets dropped
# self.check_connection()
def waitForNotifications(self, delay):
""" Allow some time for the board to receive new data. """
+2
Ver Arquivo
@@ -26,4 +26,6 @@ if __name__ == '__main__':
# board.ser.write('v')
# time.sleep(10)
shield.start_streaming(printData)
shield.loop()
# board.print_bytes_in()