Arquivos
dreamento/ZmaxHeadband.py
2022-07-06 11:54:57 +02:00

195 linhas
7.0 KiB
Python

import numpy as np
import time
from ZmaxSocket import ZmaxSocket
import enum
class ZmaxDataID(enum.Enum):
eegr = 0
eegl = 1
dx = 2
dy = 3
dz = 4
bodytemp = 5
bat = 6
noise = 7
light = 8
nasal_l = 9
nasal_r = 10
oxy_ir_ac = 11
oxy_r_ac = 12
oxy_dark_ac = 13
oxy_ir_dc = 14
oxy_r_dc = 15
oxy_dark_dc = 16
def connect():
socket = ZmaxSocket()
socket.connect()
print(socket.serverConnected)
if socket.serverConnected:
socket.sendString('HELLO\n')
time.sleep(0.3) # sec
return socket
else:
return None
class ZmaxHeadband():
def __init__(self):
self.buf_size = 3 * 256 # 3 seconds at 256 frames per second (plotting can be SLOW)
self.buf_eeg1 = np.zeros((self.buf_size, 1))
self.buf_eeg2 = np.zeros((self.buf_size, 1))
self.buf_dx = np.zeros((self.buf_size, 1))
self.buf_dy = np.zeros((self.buf_size, 1))
self.buf_dz = np.zeros((self.buf_size, 1))
self.socket = connect()
self.msgn = 1 # message number for sending stimulation
def read(self, reqIDs=[0, 1]):
"""
output refers to a list of the desired outputs of the function for example [0,1,3] returns [eegl, eegr, dy]
[0=eegr, 1=eegl, 2=dx, 3=dy, 4=dz, 5=bodytemp, 6=bat, 7=noise, 8=light, 9=nasal_l, 10=nasal_r, 11=oxy_ir_ac,
12=oxy_r_ac, 13=oxy_dark_ac, 14=oxy_ir_dc, 15=oxy_r_dc, 16=oxy_dark_dc]
"""
reqVals = []
buf = self.socket.receive_oneLineBuffer()
if str.startswith(buf, 'DEBUG'): # ignore debugging messages from server
pass
else:
if str.startswith(buf, 'D'): # only process data packets
p = buf.split('.')
if (len(p) == 2):
buf = p[1]
packet_type = self.getbyteat(buf, 0)
if ((packet_type >= 1) and (packet_type <= 11)): # packet type within correct range
if (len(buf) == 119):
# EEG channels
eegr = self.getwordat(buf, 1)
eegl = self.getwordat(buf, 3)
# Accelerometer channels
dx = self.getwordat(buf, 5)
dy = self.getwordat(buf, 7)
dz = self.getwordat(buf, 9)
# PPG channels (not plotted)
oxy_ir_ac = self.getwordat(buf, 27) # requires external nasal sensor
oxy_r_ac = self.getwordat(buf, 25) # requires external nasal sensor
oxy_dark_ac = self.getwordat(buf, 34) # requires external nasal sensor
oxy_ir_dc = self.getwordat(buf, 17) # requires external nasal sensor
oxy_r_dc = self.getwordat(buf, 15) # requires external nasal sensor
oxy_dark_dc = self.getwordat(buf, 32) # requires external nasal sensor
# other channels (not plotted)
bodytemp = self.getwordat(buf, 36)
nasal_l = self.getwordat(buf, 11) # requires external nasal sensor
nasal_r = self.getwordat(buf, 13) # requires external nasal sensor
light = self.getwordat(buf, 21)
bat = self.getwordat(buf, 23)
noise = self.getwordat(buf, 19)
# convert
eegr, eegl = self.ScaleEEG(eegr), self.ScaleEEG(eegl)
dx, dy, dz = self.ScaleAccel(dx), self.ScaleAccel(dy), self.ScaleAccel(dz)
bodytemp = self.BodyTemp(bodytemp)
bat = self.BatteryVoltage(bat)
# for function return
result = [eegr, eegl, dx, dy, dz, bodytemp, bat, noise, light, nasal_l, nasal_r, \
oxy_ir_ac, oxy_r_ac, oxy_dark_ac, oxy_ir_dc, oxy_r_dc, oxy_dark_dc]
for i in reqIDs:
reqVals.append(result[i])
return reqVals
def getbyteat(self, buf, idx=0):
"""
for example getbyteat("08-80-56-7F-EA",0) -> hex2dec(08)
getbyteat("08-80-56-7F-EA",2) -> hex2dec(56)
"""
s = buf[idx * 3:idx * 3 + 2]
return self.hex2dec(s)
def getwordat(self, buf, idx=0):
w = self.getbyteat(buf, idx) * 256 + self.getbyteat(buf, idx + 1)
return w
def ScaleEEG(self, e): # word value to uV
uvRange = 3952;
d = e - 32768;
d = d * uvRange;
d = d / 65536
return d
def ScaleAccel(self, dx): # word value to 'g'
d = dx * 4 / 4096 - 2
return d
def BatteryVoltage(self, vbat): # word value to Volts
v = vbat / 1024 * 6.60;
return v
def BodyTemp(self, bodytemp): # word value to degrees C
v = bodytemp / 1024 * 3.3
t = 15 + ((v - 1.0446) / 0.0565537333333333)
return t
def hex2dec(self, s):
"""return the integer value of a hexadecimal string s"""
return int(s, 16)
def dec2hex(self, n, pad=0):
"""return the hexadecimal string representation of integer n"""
s = "%X" % n
if pad == 0:
return s
else:
# for example if pad = 3, the dec2hex(5,2) = '005'
return s.rjust(pad, '0')
def stimulate(self, rgb1=(0, 0, 2), rgb2=(0, 0, 2), pwm1=254, pwm2=0, t1=1, t2=3, reps=5, vib=1, alt=0):
"""
example:
LIVEMODE_SENDBYTES 15 6 111 04-00-00-02-00-00-02-FE-00-01-03-05-01-00\r\n
command = "LIVEMODE_SENDBYTES"
retries = 15
msgn = 6
retry_ms = 111
LIVECMD_FLASHLEDS = 04
r = 00
g = 00
b = 02
r2 = 00
g2 = 00
b2 = 02
pwm1 = FE (254); intensity from 2(1%) to 254(100%)
pwm2 = 00
t1 = 01
t2 = 03
reps = 05
vib = 01
alt = 00 # althernate eyes
"""
command = "LIVEMODE_SENDBYTES"
retries = 15
retry_ms = 111
LIVECMD_FLASHLEDS = 4
i1 = self.dec2hex(LIVECMD_FLASHLEDS, pad=2)
i2 = self.dec2hex(rgb1[0], pad=2)
i3 = self.dec2hex(rgb1[1], pad=2)
i4 = self.dec2hex(rgb1[2], pad=2)
i5 = self.dec2hex(rgb2[0], pad=2)
i6 = self.dec2hex(rgb2[1], pad=2)
i7 = self.dec2hex(rgb2[2], pad=2)
i8 = self.dec2hex(pwm1, pad=2)
i9 = self.dec2hex(pwm2, pad=2)
i10 = self.dec2hex(t1, pad=2)
i11 = self.dec2hex(t2, pad=2)
i12 = self.dec2hex(reps, pad=2)
i13 = self.dec2hex(vib, pad=2)
i14 = self.dec2hex(alt, pad=2)
s = f"""{command} {retries} {self.msgn} {retry_ms} {i1}-{i2}-{i3}-{i4}\
-{i5}-{i6}-{i7}-{i8}-{i9}-{i10}-{i11}-{i12}-{i13}-{i14}\r\n"""
# print(s)
self.socket.sendString(s)
self.msgn += 1