Skip to content
Snippets Groups Projects
comms_wrapper_new2.py 7.92 KiB
import serial
import serial.tools.list_ports
from threading import Thread
import time
from pynput import keyboard
import copy

########
### Arduino communication
########

class Arduino: 
    def __init__(self, descriptiveDeviceName, portName, baudrate): 
        # About the device
        self.descriptiveDeviceName = descriptiveDeviceName
        self.portName = portName
        self.baudrate = baudrate
        
        # Communication
        self._rawReceivedMessage = None
        self.receivedMessages = {}
        self.arduino = None
        self.handshakeStatus = False
        self.connectionStatus = False
        self._echo_python_msg = None
        self.newMsgRecieved = False

        # Threading
        self.__thread = None

    # Private methods
    def _serial_readline(self):
        while 1:
            try:
                self._rawReceivedMessage = self.arduino.readline().decode('utf-8')[:-2]
            except:
                pass

    def _startReadingThread(self):
        self.__thread = Thread(target=self._serial_readline)
        self.__thread.daemon = True
        self.__thread.start()


    def _serial_write(self, msg):
        if self.arduino is not None:
            self.arduino.write(bytes(msg, 'utf-8'))


    def _connect_to_arduino(self):
        # Connect to the arduino device
        try: 
            self.arduino = serial.Serial(port=self.portName, baudrate=self.baudrate)

            # toggle dtr to reset the arduino
            self.arduino.dtr = True
            self.arduino.dtr = False

            self.connectionStatus = True
        
            print("Successfully connected to " + self.descriptiveDeviceName)
            return True
        except:
            print("!! Cannot connect to " + self.descriptiveDeviceName + " !!")
            return False
        

    def _disect_and_save_message(self):
        receivedMessageTemp = copy.deepcopy(self.receivedMessages)
        msg = copy.deepcopy(self._rawReceivedMessage)
        
        if msg[-2] != ":":
            return False

        msg = msg[1:-2]
        splitMsg = msg.split(":")
        for singleMsg in splitMsg:
            if len(singleMsg.split(";")) == 2:
                msgName = singleMsg.split(";")[0]
                msgPayload = singleMsg.split(";")[1]

                if msgName == "echo*":
                        self._echo_python_msg = msgPayload
                else:
                    receivedMessageTemp[msgName] = msgPayload

            else:
                return False
        
        if receivedMessageTemp == self.receivedMessages: 
            self.newMsgRecieved = False
        else: 
            self.newMsgRecieved = True
        
        self.receivedMessages = receivedMessageTemp
        return True
        
        
    # Public methods
    def connect_and_handshake(self): 
        # Connect to the arduino device

        if self._connect_to_arduino():
            pass
        else:
            return False

        # Start the reading thread
        self._startReadingThread()
        
        # Wait for a bit for the arduino to initialise nicely
        time.sleep(0.5)

        # Conduct the handshake process
        timeoutTimer = time.time()
        handshakeTimeoutSec = 5

        self.arduino.reset_input_buffer()
        self.arduino.reset_output_buffer()

        while time.time() - timeoutTimer < handshakeTimeoutSec:
            self._serial_write("handshake1\n")

            if self._rawReceivedMessage == "handshake2":
                self.handshakeStatus = True
                break

        if self.handshakeStatus: 
            timeoutTimer = time.time()
            while time.time() - timeoutTimer < handshakeTimeoutSec:
                self.receive_message()
                if self._echo_python_msg == "NO_PYTHON_MESSAGE":
                    break
            time.sleep(0.5)
            print("Successfull handshake with " + self.descriptiveDeviceName)
        else:
            print("!! Handshake failed with " + self.descriptiveDeviceName + " !!")

        return self.handshakeStatus


    def disconnect_arduino(self):
        self.arduino.close()


    def send_message(self, msg):
        # If we are sending multiple messages
        if type(msg) == list:
            payload = ""
            for value in msg:
                payload += str(value)
                payload += "@" 
            payload = payload[:-1]

        # If we are sending a single message
        else:
            payload = str(msg)

        self._serial_write(payload + "\n")


    def receive_message(self, printOutput = False, verbose = False):
        if not self.handshakeStatus:
            print("!! Handshake not completed !!")
            return False
        else:
            isMessageValid = True
            msg = self._rawReceivedMessage
            
            try:
                # sanity check 1: check if ends of the message are < and >
                if msg[0] == "<" and msg[-1] == ">":
                    pass

                elif msg[:6] == "<echo*":
                    pass

                else:
                    isMessageValid = False

            except:
                isMessageValid = False
            
            if isMessageValid:
                isMessageValid = self._disect_and_save_message()

                if printOutput:
                    if isMessageValid: 
                        if verbose: 
                            print("----------------------")
                            print("Raw message received on python side: ", self._rawReceivedMessage)
                            print("Messege received from the arduino: ", self.receivedMessages)
                            print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n")

                        else:
                            print("Messege received from the arduino: ", self.receivedMessages)
                    else: 
                        print("Message from arduino is somehow not valid")

            time.sleep(0.0001)
            return isMessageValid


    def current_status(self):
        status = {
            "Device name" : self.descriptiveDeviceName, 
            "Baudrate: ": self.baudrate,
            "Portname: ": self.portName, 
            "Connection: ": self.connectionStatus, 
            "Handshake: ": self.handshakeStatus
            }

        return status
    

    def debug(self, verbose = False):
        if verbose:   
            if not self.receive_message(printOutput=True, verbose = True):
                print("Message from arduino is somehow not valid")
            
            print("Current status of this device:")
            print(self.current_status())
            
        else:
            print("----------------------")
            if not self.receive_message(printOutput=True):
                print("Message from arduino is somehow not valid")

            print("Python message stored on", self.descriptiveDeviceName, ": ", self._echo_python_msg, "\n")


########
### Key commands
########

class Key():
    def __init__(self):                
        self.keyPressLatching = None
        self._keyReleaseLatching = None
        self.keyPress = None
        self._start_keyboard_listener()

    def _on_press(self, key):
        try:
            self.keyPressLatching = key.char
            self.keyPress = key.char
            
        except AttributeError:
            self.keyPressLatching = key
            self.keyPress = key
            

    def _on_release(self, key):
        try:
            self._keyReleaseLatching = key.char

            if self._keyReleaseLatching == self.keyPress: 
                self.keyPress = None

        except AttributeError:
            self._keyReleaseLatching = key

            if self._keyReleaseLatching == self.keyPress: 
                self.keyPress = None


    def _start_keyboard_listener(self):  
        listener = keyboard.Listener(on_press=self._on_press, on_release=self._on_release)
        listener.start()
        print("keyboard listener started")