From ec89a6499b5ef3a608a0698ef982ea2d736daa21 Mon Sep 17 00:00:00 2001 From: hamuchiwa Date: Sun, 10 Apr 2016 12:51:03 -0400 Subject: [PATCH] Add video/sensor streaming test files --- stream_server_test.py | 44 +++++++++++++++++++++++++++++++++++++++ ultrasonic_server_test.py | 35 +++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 stream_server_test.py create mode 100644 ultrasonic_server_test.py diff --git a/stream_server_test.py b/stream_server_test.py new file mode 100644 index 0000000..37e8c9b --- /dev/null +++ b/stream_server_test.py @@ -0,0 +1,44 @@ +__author__ = 'zhengwang' + +import numpy as np +import cv2 +import socket + + +class VideoStreamingTest(object): + def __init__(self): + + self.server_socket = socket.socket() + self.server_socket.bind(('192.168.1.100', 8000)) + self.server_socket.listen(0) + self.connection, self.client_address = self.server_socket.accept() + self.connection = self.connection.makefile('rb') + self.streaming() + + def streaming(self): + + try: + print "Connection from: ", self.client_address + print "Streaming..." + print "Press 'q' to exit" + + stream_bytes = ' ' + while True: + stream_bytes += self.connection.read(1024) + first = stream_bytes.find('\xff\xd8') + last = stream_bytes.find('\xff\xd9') + if first != -1 and last != -1: + jpg = stream_bytes[first:last + 2] + stream_bytes = stream_bytes[last + 2:] + #image = cv2.imdecode(np.fromstring(jpg, dtype=np.uint8), cv2.CV_LOAD_IMAGE_GRAYSCALE) + image = cv2.imdecode(np.fromstring(jpg, dtype=np.uint8), cv2.CV_LOAD_IMAGE_UNCHANGED) + cv2.imshow('image', image) + + if cv2.waitKey(1) & 0xFF == ord('q'): + break + finally: + self.connection.close() + self.server_socket.close() + +if __name__ == '__main__': + VideoStreamingTest() \ No newline at end of file diff --git a/ultrasonic_server_test.py b/ultrasonic_server_test.py new file mode 100644 index 0000000..f4aaf21 --- /dev/null +++ b/ultrasonic_server_test.py @@ -0,0 +1,35 @@ +__author__ = 'zhengwang' + +import socket +import time + + +class SensorStreamingTest(object): + def __init__(self): + + self.server_socket = socket.socket() + self.server_socket.bind(('192.168.1.100', 8002)) + self.server_socket.listen(0) + self.connection, self.client_address = self.server_socket.accept() + self.streaming() + + def streaming(self): + + try: + print "Connection from: ", self.client_address + print "Press 'q' to exit" + start = time.time() + + while True: + sensor_data = float(self.connection.recv(1024)) + print "Distance: %0.1f cm" % sensor_data + + # testing for 10 seconds + if time.time() - start > 10: + break + finally: + self.connection.close() + self.server_socket.close() + +if __name__ == '__main__': + SensorStreamingTest() \ No newline at end of file