133 linhas
4.3 KiB
Python
Arquivo Executável
133 linhas
4.3 KiB
Python
Arquivo Executável
from threading import Thread
|
|
import socket, select, struct, time
|
|
import plugin_interface as plugintypes
|
|
|
|
# Simple TCP server to "broadcast" data to clients, handling deconnections. Binary format use network endianness (i.e., big-endian), float32
|
|
|
|
# TODO: does not listen for anything at the moment, could use it to set options
|
|
|
|
# Handling new client in separate thread
|
|
class MonitorStreamer(Thread):
|
|
"""Launch and monitor a "Streamer" entity (incoming connections if implemented, current sampling rate)."""
|
|
# tcp_server: the TCPServer instance that will be used
|
|
def __init__(self, streamer):
|
|
Thread.__init__(self)
|
|
# bind to Streamer entity
|
|
self.server = streamer
|
|
|
|
def run(self):
|
|
# run until we DIE
|
|
while True:
|
|
# check FPS + listen for new connections
|
|
# FIXME: not so great with threads -- use a lock?
|
|
# TODO: configure interval
|
|
self.server.check_connections()
|
|
time.sleep(1)
|
|
|
|
|
|
class StreamerTCPServer(plugintypes.IPluginExtended):
|
|
"""
|
|
|
|
Relay OpenBCI values to TCP clients
|
|
|
|
Args:
|
|
port: Port of the server
|
|
ip: IP address of the server
|
|
|
|
"""
|
|
|
|
def __init__(self, ip='localhost', port=12345):
|
|
# list of socket clients
|
|
self.CONNECTION_LIST = []
|
|
# connection infos
|
|
self.ip = ip
|
|
self.port = port
|
|
|
|
# From IPlugin
|
|
def activate(self):
|
|
if len(self.args) > 0:
|
|
self.ip = self.args[0]
|
|
if len(self.args) > 1:
|
|
self.port = int(self.args[1])
|
|
|
|
# init network
|
|
print("Selecting raw TCP streaming. IP: " + self.ip + ", port: " + str(self.port))
|
|
self.initialize()
|
|
|
|
# init the daemon that monitors connections
|
|
self.monit = MonitorStreamer(self)
|
|
self.monit.daemon = True
|
|
# launch monitor
|
|
self.monit.start()
|
|
|
|
# the initialize method reads settings and outputs the first header
|
|
def initialize(self):
|
|
# init server
|
|
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
# this has no effect, why ?
|
|
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
# create connection
|
|
self.server_socket.bind((self.ip, self.port))
|
|
self.server_socket.listen(1)
|
|
print("Server started on port " + str(self.port))
|
|
|
|
# From Streamer, to be called each time we're willing to accept new connections
|
|
def check_connections(self):
|
|
# First listen for new connections, and new connections only -- this is why we pass only server_socket
|
|
read_sockets,write_sockets,error_sockets = select.select([self.server_socket],[],[], 0)
|
|
for sock in read_sockets:
|
|
# New connection
|
|
sockfd, addr = self.server_socket.accept()
|
|
self.CONNECTION_LIST.append(sockfd)
|
|
print("Client (%s, %s) connected" % addr)
|
|
# and... don't bother with incoming messages
|
|
|
|
# From IPlugin: close sockets, send message to client
|
|
def deactivate(self):
|
|
# close all remote connections
|
|
for sock in self.CONNECTION_LIST:
|
|
if sock != self.server_socket:
|
|
try:
|
|
sock.send("closing!\n")
|
|
# at this point don't bother if message not sent
|
|
except:
|
|
continue
|
|
sock.close();
|
|
# close server socket
|
|
self.server_socket.close();
|
|
|
|
# broadcast channels values to all clients
|
|
# as_string: many for debug, send values with a nice "[34.45, 30.4, -38.0]"-like format
|
|
def __call__(self, sample, as_string=False):
|
|
values=sample.channel_data
|
|
# save sockets that are closed to remove them later on
|
|
outdated_list = []
|
|
for sock in self.CONNECTION_LIST:
|
|
# If one error should happen, we remove socket from the list
|
|
try:
|
|
if as_string:
|
|
sock.send(str(values) + "\n")
|
|
else:
|
|
nb_channels=len(values)
|
|
# format for binary data, network endian (big) and float (float32)
|
|
packer = struct.Struct('!%sf' % nb_channels)
|
|
# convert values to bytes
|
|
packed_data = packer.pack(*values)
|
|
sock.send(packed_data)
|
|
# TODO: should check if the correct number of bytes passed through
|
|
except:
|
|
# sometimes (always?) it's only during the second write to a close socket that an error is raised?
|
|
print("Something bad happened, will close socket")
|
|
outdated_list.append(sock)
|
|
# now we are outside of the main list, it's time to remove outdated sockets, if any
|
|
for bad_sock in outdated_list:
|
|
print("Removing socket...")
|
|
self.CONNECTION_LIST.remove(bad_sock)
|
|
# not very costly to be polite
|
|
bad_sock.close()
|
|
|
|
def show_help(self):
|
|
print("""Optional arguments: [ip [port]]
|
|
\t ip: target IP address (default: 'localhost')
|
|
\t port: target port (default: 12345)""")
|