From d7463a8fe47ef0bf4d7b294d59d6672bb42c4a95 Mon Sep 17 00:00:00 2001 From: Kai Junge <kai.junge@epfl.ch> Date: Fri, 15 Apr 2022 17:08:49 +0200 Subject: [PATCH] Tested on windows side for UNO and NANO EVERY. Deleted a few redundant files and organized for release.@ --- comms_wrapper.py | 111 ++++++++-------- comms_wrapper_new.py | 252 ----------------------------------- comms_wrapper_new2.py | 270 ------------------------------------- comms_wrapper_original.py | 273 -------------------------------------- test_new.py | 5 +- test_original.py | 40 ------ test_windows.py | 28 ---- 7 files changed, 57 insertions(+), 922 deletions(-) delete mode 100644 comms_wrapper_new.py delete mode 100644 comms_wrapper_new2.py delete mode 100644 comms_wrapper_original.py delete mode 100644 test_original.py delete mode 100644 test_windows.py diff --git a/comms_wrapper.py b/comms_wrapper.py index 978bf14..4a048d8 100644 --- a/comms_wrapper.py +++ b/comms_wrapper.py @@ -19,7 +19,6 @@ class Arduino: # Communication self._rawReceivedMessage = None self.receivedMessages = {} - self.messageNames = None self.arduino = None self.handshakeStatus = False self.connectionStatus = False @@ -35,7 +34,7 @@ class Arduino: try: self._rawReceivedMessage = self.arduino.readline().decode('utf-8')[:-2] except: - time.sleep(0.000001) + pass def _startReadingThread(self): self.__thread = Thread(target=self._serial_readline) @@ -48,14 +47,6 @@ class Arduino: self.arduino.write(bytes(msg, 'utf-8')) - def _add_new_message_name(self, msgName): - if msgName in self.receivedMessages: - print("!! Message name ", msgName, " already exists so it cannot be added. !!") - else: - self.receivedMessages[msgName] = None - self.messageNames = list(self.receivedMessages.keys()) - - def _connect_to_arduino(self): # Connect to the arduino device try: @@ -74,9 +65,10 @@ class Arduino: return False - def _disect_and_save_message(self, msg): + def _disect_and_save_message(self): receivedMessageTemp = copy.deepcopy(self.receivedMessages) - + msg = copy.deepcopy(self._rawReceivedMessage) + if msg[-2] != ":": return False @@ -87,12 +79,9 @@ class Arduino: msgName = singleMsg.split(";")[0] msgPayload = singleMsg.split(";")[1] - if msgName == "e*p": + if msgName == "echo*": self._echo_python_msg = msgPayload else: - if msgName not in self.messageNames: - return False - receivedMessageTemp[msgName] = msgPayload else: @@ -105,17 +94,9 @@ class Arduino: self.receivedMessages = receivedMessageTemp return True - + # Public methods - def define_message_names(self, msgNames): - if type(msgNames) == list: - for msg in msgNames: - self._add_new_message_name(msg) - else: - print("Argument should be of type 'list'") - - def connect_and_handshake(self): # Connect to the arduino device @@ -137,22 +118,21 @@ class Arduino: self.arduino.reset_input_buffer() self.arduino.reset_output_buffer() - while time.time() - timeoutTimer < handshakeTimeoutSec: self._serial_write("handshake1\n") - time.sleep(0.1) if self._rawReceivedMessage == "handshake2": self.handshakeStatus = True break if self.handshakeStatus: - # while 1: - # self.receive_message() - # if self._echo_python_msg == "NO_PYTHON_MESSAGE": - # break + timeoutTimer = time.time() + while time.time() - timeoutTimer < handshakeTimeoutSec: + self.receive_message() + if self._echo_python_msg == "NO_PYTHON_MESSAGE": + break + time.sleep(0.5) print("Successfull handshake with " + self.descriptiveDeviceName) - time.sleep(1) else: print("!! Handshake failed with " + self.descriptiveDeviceName + " !!") @@ -164,7 +144,19 @@ class Arduino: def send_message(self, msg): - self._serial_write( msg + "\n") + # If we are sending multiple messages + if type(msg) == list: + payload = "" + for value in msg: + payload += str(value) + payload += "@" + payload = payload[:-1] + + # If we are sending a single message + else: + payload = str(msg) + + self._serial_write(payload + "\n") def receive_message(self, printOutput = False, verbose = False): @@ -174,31 +166,39 @@ class Arduino: else: isMessageValid = True msg = self._rawReceivedMessage - # sanity check 1: check if ends of the message are < and > + try: + # sanity check 1: check if ends of the message are < and > if msg[0] == "<" and msg[-1] == ">": pass + + elif msg[:6] == "<echo*": + pass + else: isMessageValid = False + except: isMessageValid = False - if isMessageValid: - isMessageValid = self._disect_and_save_message(msg) - else: - pass - if printOutput: - if verbose: - print("----------------------") - print("Raw message received on python side: ", self._rawReceivedMessage) - print("Messege received from the arduino: ", self.receivedMessages) - print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n") + if isMessageValid: + isMessageValid = self._disect_and_save_message() - else: - print("Messege received from the arduino: ", self.receivedMessages) + if printOutput: + if isMessageValid: + if verbose: + print("----------------------") + print("Raw message received on python side: ", self._rawReceivedMessage) + print("Messege received from the arduino: ", self.receivedMessages) + print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n") + + else: + print("Messege received from the arduino: ", self.receivedMessages) + else: + print("Message from arduino is somehow not valid") time.sleep(0.0001) - return True + return isMessageValid def current_status(self): @@ -207,8 +207,7 @@ class Arduino: "Baudrate: ": self.baudrate, "Portname: ": self.portName, "Connection: ": self.connectionStatus, - "Handshake: ": self.handshakeStatus, - "Message names: ": self.messageNames + "Handshake: ": self.handshakeStatus } return status @@ -216,20 +215,18 @@ class Arduino: def debug(self, verbose = False): if verbose: - self.receive_message(printOutput=True, verbose = True) - + if not self.receive_message(printOutput=True, verbose = True): + print("Message from arduino is somehow not valid") + print("Current status of this device:") print(self.current_status()) else: print("----------------------") - self.receive_message(printOutput=True) - print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n") - + if not self.receive_message(printOutput=True): + print("Message from arduino is somehow not valid") - for key, value in self.receivedMessages.items(): - if value is None: - print("Check if message names: ", key, " agrees with the arduino side") + print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n") ######## diff --git a/comms_wrapper_new.py b/comms_wrapper_new.py deleted file mode 100644 index afcf9cb..0000000 --- a/comms_wrapper_new.py +++ /dev/null @@ -1,252 +0,0 @@ -import serial -import serial.tools.list_ports -from threading import Thread -import time -from pynput import keyboard -import copy - -######## -### Arduino communication -######## - -class Arduino: - def __init__(self, descriptiveDeviceName, portName, baudrate): - # About the device - self.descriptiveDeviceName = descriptiveDeviceName - self.portName = portName - self.baudrate = baudrate - - # Communication - self._rawReceivedMessage = None - self.receivedMessages = {} - self.arduino = None - self.handshakeStatus = False - self.connectionStatus = False - self.newMsgRecieved = False - - # testing new comms - self._rawOrganizedMessages = {} - self._n_arduino_to_python = None - self._n_python_to_arduino = None - self._timer = None - self._prev_payload = None - - # Threading - self.__thread = None - - # Private methods - def _serial_readline(self): - while 1: - try: - self._rawReceivedMessage = self.arduino.readline().decode('utf-8')[:-2] - - msg = copy.deepcopy(self._rawReceivedMessage) - temp = copy.deepcopy(self._rawOrganizedMessages) - - if msg[0] == "<" and msg[-1] == ">": - msg = msg[1:-1] - - msgName = msg.split(";")[0] - msgPayload = msg.split(";")[1] - - temp[msgName] = msgPayload - - if len(temp) - 1 <= self._n_arduino_to_python: - self._rawOrganizedMessages = temp - except: - time.sleep(0.000001) - - def _startReadingThread(self): - self.__thread = Thread(target=self._serial_readline) - self.__thread.daemon = True - self.__thread.start() - - def _serial_write(self, msg): - if self.arduino is not None: - self.arduino.write(bytes(msg, 'utf-8')) - - def _connect_to_arduino(self): - # Connect to the arduino device - try: - self.arduino = serial.Serial(port=self.portName, baudrate=self.baudrate) - - # toggle dtr to reset the arduino - self.arduino.dtr = True - self.arduino.dtr = False - - self.connectionStatus = True - - print("Successfully connected to " + self.descriptiveDeviceName) - return True - except: - print("!! Cannot connect to " + self.descriptiveDeviceName + " !!") - return False - - # Public methods - def define_number_of_messages(self, _n_arduino_to_python, _n_python_to_arduino): - self._n_arduino_to_python = _n_arduino_to_python - self._n_python_to_arduino = _n_python_to_arduino - - def connect_and_handshake(self): - # Connect to the arduino device - - if self._connect_to_arduino(): - pass - else: - return False - - # Start the reading thread - self._startReadingThread() - - # Wait for a bit for the arduino to initialise nicely - time.sleep(0.5) - - # Conduct the handshake process - timeoutTimer = time.time() - handshakeTimeoutSec = 5 - - # Conduct handshake - while time.time() - timeoutTimer < handshakeTimeoutSec: - self._serial_write("handshake1\n") - #time.sleep(0.1) - - if self._rawReceivedMessage == "handshake2": - self.handshakeStatus = True - break - - if self.handshakeStatus: - print("Successfull handshake with " + self.descriptiveDeviceName) - time.sleep(1) - self.timer = time.time() - else: - print("!! Handshake failed with " + self.descriptiveDeviceName + " !!") - - return self.handshakeStatus - - def disconnect_arduino(self): - self.arduino.close() - - def send_message(self, msgArray): - if len(msgArray) != self._n_python_to_arduino: - print("Check if the number of messages sent from the PYTHON side agrees with the ARDUINO side") - else: - payload = "" - for msg in msgArray: - payload += str(msg) - payload += "@" - - payload = "#####" + payload[:-1] + "\n" - - if payload != self._prev_payload: - self._prev_payload = payload - self._serial_write(payload) - - def receive_message(self, printOutput = False, verbose = False): - if not self.handshakeStatus: - print("!! Handshake not completed !!") - return False - - elif self._n_arduino_to_python is None or self._n_python_to_arduino is None: - print("!! Define number of messages !!") - return False - - else: - receivedMsgLength = len(self._rawOrganizedMessages) - 1 - - if receivedMsgLength == self._n_arduino_to_python: - receivedMessagesTemp = copy.deepcopy(self._rawOrganizedMessages) - del receivedMessagesTemp["e*p"] - - # Check if we have a new message - if receivedMessagesTemp == self.receivedMessages: - self.newMsgRecieved = False - else: - self.newMsgRecieved = True - - self.receivedMessages = receivedMessagesTemp - - if printOutput: - if verbose: - print("----------------------") - print("Raw message received on python side: ", self._rawReceivedMessage) - print("Messege received from the arduino: ", self.receivedMessages) - print("Python message stored on", self.descriptiveDeviceName, ": ", self._rawOrganizedMessages["e*p"], "\n") - - else: - print("Messege received from the arduino: ", self.receivedMessages) - - time.sleep(0.0001) - return True - - else: - return False - - def current_status(self): - status = { - "Device name" : self.descriptiveDeviceName, - "Baudrate: ": self.baudrate, - "Portname: ": self.portName, - "Connection: ": self.connectionStatus, - "Handshake: ": self.handshakeStatus, - "Msg Num Arduino -> python: ": self._n_arduino_to_python, - "Msg Num Python -> Arduino: ": self._n_python_to_arduino - } - - return status - - def debug(self, verbose = False): - if verbose: - self.receive_message(printOutput=True, verbose = True) - - print("Current status of this device:") - print(self.current_status(), "\n") - - else: - print("----------------------") - self.receive_message(printOutput=True) - print("Python message stored on", self.descriptiveDeviceName, ": ", self._rawOrganizedMessages["e*p"], "\n") - - if self.timer is not None and time.time() - self.timer > 0.1: - receivedMsgLength = len(self._rawOrganizedMessages) - 1 - if receivedMsgLength != self._n_arduino_to_python: - print("Check if the number of messages sent from the ARDUINO side agrees with the PYTHON side") - -######## -### Key commands -######## - -class Key(): - def __init__(self): - self.keyPressLatching = None - self._keyReleaseLatching = None - self.keyPress = None - self._start_keyboard_listener() - - def _on_press(self, key): - try: - self.keyPressLatching = key.char - self.keyPress = key.char - - except AttributeError: - self.keyPressLatching = key - self.keyPress = key - - - def _on_release(self, key): - try: - self._keyReleaseLatching = key.char - - if self._keyReleaseLatching == self.keyPress: - self.keyPress = None - - except AttributeError: - self._keyReleaseLatching = key - - if self._keyReleaseLatching == self.keyPress: - self.keyPress = None - - - def _start_keyboard_listener(self): - listener = keyboard.Listener(on_press=self._on_press, on_release=self._on_release) - listener.start() - print("keyboard listener started") \ No newline at end of file diff --git a/comms_wrapper_new2.py b/comms_wrapper_new2.py deleted file mode 100644 index 4a048d8..0000000 --- a/comms_wrapper_new2.py +++ /dev/null @@ -1,270 +0,0 @@ -import serial -import serial.tools.list_ports -from threading import Thread -import time -from pynput import keyboard -import copy - -######## -### Arduino communication -######## - -class Arduino: - def __init__(self, descriptiveDeviceName, portName, baudrate): - # About the device - self.descriptiveDeviceName = descriptiveDeviceName - self.portName = portName - self.baudrate = baudrate - - # Communication - self._rawReceivedMessage = None - self.receivedMessages = {} - self.arduino = None - self.handshakeStatus = False - self.connectionStatus = False - self._echo_python_msg = None - self.newMsgRecieved = False - - # Threading - self.__thread = None - - # Private methods - def _serial_readline(self): - while 1: - try: - self._rawReceivedMessage = self.arduino.readline().decode('utf-8')[:-2] - except: - pass - - def _startReadingThread(self): - self.__thread = Thread(target=self._serial_readline) - self.__thread.daemon = True - self.__thread.start() - - - def _serial_write(self, msg): - if self.arduino is not None: - self.arduino.write(bytes(msg, 'utf-8')) - - - def _connect_to_arduino(self): - # Connect to the arduino device - try: - self.arduino = serial.Serial(port=self.portName, baudrate=self.baudrate) - - # toggle dtr to reset the arduino - self.arduino.dtr = True - self.arduino.dtr = False - - self.connectionStatus = True - - print("Successfully connected to " + self.descriptiveDeviceName) - return True - except: - print("!! Cannot connect to " + self.descriptiveDeviceName + " !!") - return False - - - def _disect_and_save_message(self): - receivedMessageTemp = copy.deepcopy(self.receivedMessages) - msg = copy.deepcopy(self._rawReceivedMessage) - - if msg[-2] != ":": - return False - - msg = msg[1:-2] - splitMsg = msg.split(":") - for singleMsg in splitMsg: - if len(singleMsg.split(";")) == 2: - msgName = singleMsg.split(";")[0] - msgPayload = singleMsg.split(";")[1] - - if msgName == "echo*": - self._echo_python_msg = msgPayload - else: - receivedMessageTemp[msgName] = msgPayload - - else: - return False - - if receivedMessageTemp == self.receivedMessages: - self.newMsgRecieved = False - else: - self.newMsgRecieved = True - - self.receivedMessages = receivedMessageTemp - return True - - - # Public methods - def connect_and_handshake(self): - # Connect to the arduino device - - if self._connect_to_arduino(): - pass - else: - return False - - # Start the reading thread - self._startReadingThread() - - # Wait for a bit for the arduino to initialise nicely - time.sleep(0.5) - - # Conduct the handshake process - timeoutTimer = time.time() - handshakeTimeoutSec = 5 - - self.arduino.reset_input_buffer() - self.arduino.reset_output_buffer() - - while time.time() - timeoutTimer < handshakeTimeoutSec: - self._serial_write("handshake1\n") - - if self._rawReceivedMessage == "handshake2": - self.handshakeStatus = True - break - - if self.handshakeStatus: - timeoutTimer = time.time() - while time.time() - timeoutTimer < handshakeTimeoutSec: - self.receive_message() - if self._echo_python_msg == "NO_PYTHON_MESSAGE": - break - time.sleep(0.5) - print("Successfull handshake with " + self.descriptiveDeviceName) - else: - print("!! Handshake failed with " + self.descriptiveDeviceName + " !!") - - return self.handshakeStatus - - - def disconnect_arduino(self): - self.arduino.close() - - - def send_message(self, msg): - # If we are sending multiple messages - if type(msg) == list: - payload = "" - for value in msg: - payload += str(value) - payload += "@" - payload = payload[:-1] - - # If we are sending a single message - else: - payload = str(msg) - - self._serial_write(payload + "\n") - - - def receive_message(self, printOutput = False, verbose = False): - if not self.handshakeStatus: - print("!! Handshake not completed !!") - return False - else: - isMessageValid = True - msg = self._rawReceivedMessage - - try: - # sanity check 1: check if ends of the message are < and > - if msg[0] == "<" and msg[-1] == ">": - pass - - elif msg[:6] == "<echo*": - pass - - else: - isMessageValid = False - - except: - isMessageValid = False - - if isMessageValid: - isMessageValid = self._disect_and_save_message() - - if printOutput: - if isMessageValid: - if verbose: - print("----------------------") - print("Raw message received on python side: ", self._rawReceivedMessage) - print("Messege received from the arduino: ", self.receivedMessages) - print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n") - - else: - print("Messege received from the arduino: ", self.receivedMessages) - else: - print("Message from arduino is somehow not valid") - - time.sleep(0.0001) - return isMessageValid - - - def current_status(self): - status = { - "Device name" : self.descriptiveDeviceName, - "Baudrate: ": self.baudrate, - "Portname: ": self.portName, - "Connection: ": self.connectionStatus, - "Handshake: ": self.handshakeStatus - } - - return status - - - def debug(self, verbose = False): - if verbose: - if not self.receive_message(printOutput=True, verbose = True): - print("Message from arduino is somehow not valid") - - print("Current status of this device:") - print(self.current_status()) - - else: - print("----------------------") - if not self.receive_message(printOutput=True): - print("Message from arduino is somehow not valid") - - print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n") - - -######## -### Key commands -######## - -class Key(): - def __init__(self): - self.keyPressLatching = None - self._keyReleaseLatching = None - self.keyPress = None - self._start_keyboard_listener() - - def _on_press(self, key): - try: - self.keyPressLatching = key.char - self.keyPress = key.char - - except AttributeError: - self.keyPressLatching = key - self.keyPress = key - - - def _on_release(self, key): - try: - self._keyReleaseLatching = key.char - - if self._keyReleaseLatching == self.keyPress: - self.keyPress = None - - except AttributeError: - self._keyReleaseLatching = key - - if self._keyReleaseLatching == self.keyPress: - self.keyPress = None - - - def _start_keyboard_listener(self): - listener = keyboard.Listener(on_press=self._on_press, on_release=self._on_release) - listener.start() - print("keyboard listener started") \ No newline at end of file diff --git a/comms_wrapper_original.py b/comms_wrapper_original.py deleted file mode 100644 index 978bf14..0000000 --- a/comms_wrapper_original.py +++ /dev/null @@ -1,273 +0,0 @@ -import serial -import serial.tools.list_ports -from threading import Thread -import time -from pynput import keyboard -import copy - -######## -### Arduino communication -######## - -class Arduino: - def __init__(self, descriptiveDeviceName, portName, baudrate): - # About the device - self.descriptiveDeviceName = descriptiveDeviceName - self.portName = portName - self.baudrate = baudrate - - # Communication - self._rawReceivedMessage = None - self.receivedMessages = {} - self.messageNames = None - self.arduino = None - self.handshakeStatus = False - self.connectionStatus = False - self._echo_python_msg = None - self.newMsgRecieved = False - - # Threading - self.__thread = None - - # Private methods - def _serial_readline(self): - while 1: - try: - self._rawReceivedMessage = self.arduino.readline().decode('utf-8')[:-2] - except: - time.sleep(0.000001) - - def _startReadingThread(self): - self.__thread = Thread(target=self._serial_readline) - self.__thread.daemon = True - self.__thread.start() - - - def _serial_write(self, msg): - if self.arduino is not None: - self.arduino.write(bytes(msg, 'utf-8')) - - - def _add_new_message_name(self, msgName): - if msgName in self.receivedMessages: - print("!! Message name ", msgName, " already exists so it cannot be added. !!") - else: - self.receivedMessages[msgName] = None - self.messageNames = list(self.receivedMessages.keys()) - - - def _connect_to_arduino(self): - # Connect to the arduino device - try: - self.arduino = serial.Serial(port=self.portName, baudrate=self.baudrate) - - # toggle dtr to reset the arduino - self.arduino.dtr = True - self.arduino.dtr = False - - self.connectionStatus = True - - print("Successfully connected to " + self.descriptiveDeviceName) - return True - except: - print("!! Cannot connect to " + self.descriptiveDeviceName + " !!") - return False - - - def _disect_and_save_message(self, msg): - receivedMessageTemp = copy.deepcopy(self.receivedMessages) - - if msg[-2] != ":": - return False - - msg = msg[1:-2] - splitMsg = msg.split(":") - for singleMsg in splitMsg: - if len(singleMsg.split(";")) == 2: - msgName = singleMsg.split(";")[0] - msgPayload = singleMsg.split(";")[1] - - if msgName == "e*p": - self._echo_python_msg = msgPayload - else: - if msgName not in self.messageNames: - return False - - receivedMessageTemp[msgName] = msgPayload - - else: - return False - - if receivedMessageTemp == self.receivedMessages: - self.newMsgRecieved = False - else: - self.newMsgRecieved = True - - self.receivedMessages = receivedMessageTemp - return True - - - # Public methods - def define_message_names(self, msgNames): - if type(msgNames) == list: - for msg in msgNames: - self._add_new_message_name(msg) - else: - print("Argument should be of type 'list'") - - - def connect_and_handshake(self): - # Connect to the arduino device - - if self._connect_to_arduino(): - pass - else: - return False - - # Start the reading thread - self._startReadingThread() - - # Wait for a bit for the arduino to initialise nicely - time.sleep(0.5) - - # Conduct the handshake process - timeoutTimer = time.time() - handshakeTimeoutSec = 5 - - self.arduino.reset_input_buffer() - self.arduino.reset_output_buffer() - - - while time.time() - timeoutTimer < handshakeTimeoutSec: - self._serial_write("handshake1\n") - time.sleep(0.1) - - if self._rawReceivedMessage == "handshake2": - self.handshakeStatus = True - break - - if self.handshakeStatus: - # while 1: - # self.receive_message() - # if self._echo_python_msg == "NO_PYTHON_MESSAGE": - # break - print("Successfull handshake with " + self.descriptiveDeviceName) - time.sleep(1) - else: - print("!! Handshake failed with " + self.descriptiveDeviceName + " !!") - - return self.handshakeStatus - - - def disconnect_arduino(self): - self.arduino.close() - - - def send_message(self, msg): - self._serial_write( msg + "\n") - - - def receive_message(self, printOutput = False, verbose = False): - if not self.handshakeStatus: - print("!! Handshake not completed !!") - return False - else: - isMessageValid = True - msg = self._rawReceivedMessage - # sanity check 1: check if ends of the message are < and > - try: - if msg[0] == "<" and msg[-1] == ">": - pass - else: - isMessageValid = False - except: - isMessageValid = False - if isMessageValid: - isMessageValid = self._disect_and_save_message(msg) - else: - pass - - if printOutput: - if verbose: - print("----------------------") - print("Raw message received on python side: ", self._rawReceivedMessage) - print("Messege received from the arduino: ", self.receivedMessages) - print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n") - - else: - print("Messege received from the arduino: ", self.receivedMessages) - - time.sleep(0.0001) - return True - - - def current_status(self): - status = { - "Device name" : self.descriptiveDeviceName, - "Baudrate: ": self.baudrate, - "Portname: ": self.portName, - "Connection: ": self.connectionStatus, - "Handshake: ": self.handshakeStatus, - "Message names: ": self.messageNames - } - - return status - - - def debug(self, verbose = False): - if verbose: - self.receive_message(printOutput=True, verbose = True) - - print("Current status of this device:") - print(self.current_status()) - - else: - print("----------------------") - self.receive_message(printOutput=True) - print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n") - - - for key, value in self.receivedMessages.items(): - if value is None: - print("Check if message names: ", key, " agrees with the arduino side") - - -######## -### Key commands -######## - -class Key(): - def __init__(self): - self.keyPressLatching = None - self._keyReleaseLatching = None - self.keyPress = None - self._start_keyboard_listener() - - def _on_press(self, key): - try: - self.keyPressLatching = key.char - self.keyPress = key.char - - except AttributeError: - self.keyPressLatching = key - self.keyPress = key - - - def _on_release(self, key): - try: - self._keyReleaseLatching = key.char - - if self._keyReleaseLatching == self.keyPress: - self.keyPress = None - - except AttributeError: - self._keyReleaseLatching = key - - if self._keyReleaseLatching == self.keyPress: - self.keyPress = None - - - def _start_keyboard_listener(self): - listener = keyboard.Listener(on_press=self._on_press, on_release=self._on_release) - listener.start() - print("keyboard listener started") \ No newline at end of file diff --git a/test_new.py b/test_new.py index bff5083..1938166 100644 --- a/test_new.py +++ b/test_new.py @@ -1,10 +1,10 @@ -from comms_wrapper_new2 import * +from comms_wrapper import * from time import sleep, time def main(): key = Key() - ad = Arduino(descriptiveDeviceName="myArduino", portName="/dev/ttyACM0", baudrate=115200) + ad = Arduino(descriptiveDeviceName="myArduino", portName="COM9", baudrate=115200) ad.connect_and_handshake() input() @@ -23,5 +23,6 @@ def main(): ad.receive_message() print(ad.receivedMessages) #ad.debug() + if __name__ == '__main__': main() \ No newline at end of file diff --git a/test_original.py b/test_original.py deleted file mode 100644 index f9c3849..0000000 --- a/test_original.py +++ /dev/null @@ -1,40 +0,0 @@ -from comms_wrapper_original import * -from time import sleep, time - -def main(): - key = Key() - - - ad = Arduino(descriptiveDeviceName="myArduino", portName="COM", baudrate=115200) - - #ad.define_message_names(["msgA", "msgB"]) - - ad.connect_and_handshake() - - timer = time() - while 1: - - # ad.receive_message() - # print(ad.receivedMessages) - - if key.keyPress == "f": - ad.send_message("apples") - #arduino2.send_message("oranges") - - if key.keyPress == "g": - ad.send_message("bananas") - #arduino2.send_message("grapes") - t = str(round(time() - timer, 5)) - while 1: - if len(t) < 5: - t += "0" - elif len(t) > 5: - t = t[:-1] - else: - break - ad.send_message(str(t)) - - print(ad._rawReceivedMessage) - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/test_windows.py b/test_windows.py deleted file mode 100644 index 1435ba8..0000000 --- a/test_windows.py +++ /dev/null @@ -1,28 +0,0 @@ -from comms_wrapper_new import * -from time import sleep, time - -def main(): - key = Key() - - arduino1 = Arduino(descriptiveDeviceName="myArduino", portName="COM7", baudrate=115200) - arduino1.define_number_of_messages(0, 1) - arduino1.connect_and_handshake() - - timer = time() - while 1: - - #arduino1.send_message([str(round(time() - timer, 2)) ]) - t = str(round(time() - timer, 5)) - while 1: - if len(t) < 5: - t += "0" - elif len(t) > 5: - t = t[:-1] - else: - break - arduino1.send_message([t]) - - print(arduino1._rawReceivedMessage, arduino1._rawOrganizedMessages) - -if __name__ == '__main__': - main() \ No newline at end of file -- GitLab