codepad
[
create a new paste
]
login
|
about
Language:
C
C++
D
Haskell
Lua
OCaml
PHP
Perl
Plain Text
Python
Ruby
Scheme
Tcl
# -*- coding: utf-8 -*- # # Class: USBinterface - Interface between RapsberryPi and Arduino # # Version: 0.1 # Creator: meigrafd # Copyright (C) 2016 by meiraspi@gmail.com published under the MIT License # import time import serial import logging class interfaceError(Exception): def __init__(self, message): self.message = message def __str__(self): return repr(self.message) class USBinterfaceClass(object): """ Methods implementing an interface between RaspberryPi and an Arduino """ # Interface control and information attributes returnStatus = 0 # Status of call (0=Good, -1=Error) returnValue = "" # Return value (set as appropriate) returnDebug = "" # Return message (set as appropriate) returnLastCall = "" # Last method called from class initFailed = False # Set to true if our _init_ failed (e.g. serial port issue) commRetries = 30 # Number of communication retries commTimeout = 1 # Timeout wait on communication retries traceFile = "" # Pointer for a trace file lastTime = "" # Last time stamp shared with the Arduino debugOn = False # Print error messages traceOn = False # Log all interactions exceptionOn = False # Throw exceptions on error responseRetries = 3 # Number of times to retry from a response mess up responseRetry = 0 # Current retry dialogDebug = False # Set to true for debug statements across response # ***** # Define the pins available for our Arduino board # ***** board = { 'pins' : tuple(x for x in range(54)), 'digital' : tuple(x for x in range(54)), 'analog' : tuple(x for x in range(16)), 'pwm' : tuple(x for x in range(2,14)), 'disabled' : (0, 1, 14, 15) # Rx, Tx, Crystal } # Initialize the serial port on instantiation and close on deletion def __init__(self, port="/dev/ttyACM0", baudrate=38400): try: self.serial = serial.Serial(port, baudrate, timeout=self.commTimeout) except: self.initFailed = True self.returnDebug = "Error opening serial port '" + port + "'" return self.__handleRaiseException() print("Initialized USBinterfaceClass") def __del__(self): if not self.initFailed and self.serial: self.serial.close() def printD(self, line): if self.dialogDebug: print(line) # Internal methods to send a command and receive a response def __sendCommand(self, command): # Attempt to get a good handshake and then send the command sendingCommand = True retries = self.commRetries while sendingCommand: # Establish a handshake self.serial.write(("%").encode("ISO-8859-1")) self.printD("sendCommand: sent %") test = self.serial.read().decode("ISO-8859-1") if test == "#": self.printD("sendCommand: got #") #self.printD("sendCommand: good handshake") # With a good handshake send the command self.serial.write((command).encode("ISO-8859-1")) # Terminate the command self.serial.write(("@").encode("ISO-8859-1")) self.printD("sendCommand: sent " + str(command) + "@") # Done waiting to send the command sendingCommand = False retries = retries - 1 if retries == 0: self.returnDebug = self.__handleTraceDebug("timeout on handshake with Arduino") self.returnStatus = True self.returnValue = True return self.__handleRaiseException() # With a good handshake try to get a response to our command retries = self.commRetries while retries > 0: # Get the response from the Arduino response = self.__getResponse(command) # No response so retry if response == -999: self.printD("sendCommand: no response - retry") retries = retries - 1 # Got a response so back we go elif response == True or response == False: self.printD("sendCommand: got response") self.returnStatus = response return self.returnValue retries = retries - 1 self.returnDebug = self.__handleTraceDebug("timeout on command response from Arduino") self.returnStatus = True self.returnValue = True return self.__handleRaiseException() def __getResponse(self, command): returnLine = (self.serial.readline().strip()).decode("ISO-8859-1") # Do some cleanup on the returned line if returnLine == "": self.printD("getResponse: blank line") return -1 returnLine = returnLine.replace("#","") # First character should be a valid return code, if not we will ignore this line if returnLine.startswith("0"): response = False if returnLine.startswith("1"): response = True # Split off the return values if any returnArgs = returnLine.split(",") try: self.returnValue = returnArgs[1].strip() except IndexError: self.returnValue = 0 try: self.returnDebug = returnArgs[2] except IndexError: self.returnDebug = "" try: self.__handleTraceDebug("returned " + str(response) + " value " + str(self.returnValue) + " " + self.returnDebug) except: if self.responseRetry < self.responseRetries: self.responseRetry += 1 self.printD("getResponse: response retry " + str(responseRetry) + " of " + str(responseRetries)) self.__sendCommand(command) else: self.printD("getResponse: giving up on response retries ") self.responseRetry = 0 return self.__handleRaiseException() self.printD("getResponse: valid response of " + str(response) + ": " + str(returnLine)) if response: return self.__handleRaiseException() else: return response # ***** # Raise exception intercepter # ***** def __handleRaiseException(self): if self.exceptionOn: raise interfaceError(self.returnDebug) else: return True # ***** # Trace/debug intercepter # ***** def __handleTraceDebug(self, message): if message[0] == "@": self.returnLastCall = message if self.debugOn: print(message) if self.traceOn: self.traceFile.write(self.lastTime + " - " + message) self.traceFile.write("\n") return message #------------------------------------------------------------------- # COMMAND METHODS #------------------------------------------------------------------- # ***** # (P)in mode commands # ***** def pinModeInput(self, pin): self.setLastTime() if pin not in self.board['pins']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Mode Input'" return -1 self.__handleTraceDebug("@pinModeInput for pin " + str(pin)) return self.__sendCommand("PI" + str(pin)) def pinModeOutput(self, pin): self.setLastTime() if pin not in self.board['pins']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Mode Output'" return -1 self.__handleTraceDebug("@pinModeOutput for pin " + str(pin)) return self.__sendCommand("PO" + str(pin)) # ***** # (D)igital commands # ***** def setHigh(self, pin): self.setLastTime() if pin not in self.board['digital']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Set High'" return -1 self.__handleTraceDebug("@setHigh for pin " + str(pin)) return self.__sendCommand("DH" + str(pin)) def setLow(self, pin): self.setLastTime() if pin not in self.board['digital']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Set Low'" return -1 self.__handleTraceDebug("@setLow for pin " + str(pin)) return self.__sendCommand("DL" + str(pin)) def digitalRead(self, pin): self.setLastTime() if pin not in self.board['digital']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Digital Read'" return -1 self.__handleTraceDebug("@digitalRead for pin " + str(pin)) return self.__sendCommand("DR" + str(pin)) def digitalWrite(self, pin, value): self.setLastTime() if pin not in self.board['digital']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Digital Write'" return -1 self.__handleTraceDebug("@digitalWrite for pin " + str(pin)) return self.__sendCommand("DW" + str(pin) + "," + str(value)) # ***** # (A)nalog commands # ***** def analogRead(self, pin): self.setLastTime() if pin not in self.board['analog'] or pin in self.board['disabled']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Analog Read'" return -1 self.__handleTraceDebug("@analogRead for pin " + str(pin)) return self.__sendCommand("AR" + str(pin)) # ***** # PWM commands - set as (A)nalog # ***** def pwmWrite(self, pin, value): self.setLastTime() if pin not in self.board['pwm'] or pin in self.board['disabled']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'PWM Write'" return -1 self.__handleTraceDebug("@pwmWrite for pin " + str(pin) + " value " + str(value)) return self.__sendCommand("AW" + str(pin) + "," + str(value)) def pwmRead(self, pin): self.setLastTime() if pin not in self.board['pwm'] or pin in self.board['disabled']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'PWM Read'" return -1 self.__handleTraceDebug("@pwmRead for pin " + str(pin)) return self.__sendCommand("AR" + str(pin)) def pwmPulseLow(self, pin, trigger): self.setLastTime() if pin not in self.board['pwm'] or pin in self.board['disabled']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'PWM Pulse Low'" return -1 if trigger not in self.board['digital']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for trigger in 'PWM Pulse Low'" return -1 self.__handleTraceDebug("@pwmPulseLow for pin " + str(pin) + " trigger " + str(trigger)) return self.__sendCommand("AL" + str(pin) + "," + str(trigger)) def pwmPulseHigh(self, pin, trigger): self.setLastTime() if pin not in self.board['pwm'] or pin in self.board['disabled']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'PWM Pulse High'" return -1 if trigger not in self.board['digital']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for trigger in 'PWM Pulse High'" return -1 self.__handleTraceDebug("@pwmPulseHigh for pin " + str(pin) + " trigger " + str(trigger)) return self.__sendCommand("AH" + str(pin) + "," + str(trigger)) def USping(self, echo, trigger): self.setLastTime() if echo not in self.board['pwm'] or echo in self.board['disabled']: self.returnDebug = "Invalid pin (" + str(echo) + ") specified for 'US ping'" return -1 if trigger not in self.board['digital']: self.returnDebug = "Invalid pin (" + str(trigger) + ") specified for trigger in 'US ping'" return -1 self.__handleTraceDebug("@USping for echo " + str(echo) + " trigger " + str(trigger)) return self.__sendCommand("AP" + str(echo) + "," + str(trigger)) # ***** # (S)ervo commands # ***** def attachServo(self, servo, pin): self.setLastTime() if pin not in self.board['digital']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Attach Servo'" return -1 self.__handleTraceDebug("@attachServo " + str(servo) + " pin " + str(pin)) if servo < 0 or servo > 4: self.returnDebug = self.__handleTraceDebug("Attached servo must be 1-4") return True return self.__sendCommand("SA" + str(servo) + "," + str(pin)) def commandServo(self, servo, command): self.setLastTime() self.__handleTraceDebug("@commandServo " + str(servo) + " command " + str(command)) if servo < 0 or servo > 4: self.returnDebug = self.__handleTraceDebug("Commanded servo must be 1-4") return True return self.__sendCommand("SC" + str(servo) + "," + str(command)) def detachServo(self, servo): self.setLastTime() self.__handleTraceDebug("@detachServo " + str(servo)) if servo < 0 or servo > 4: self.returnDebug = self.__handleTraceDebug("Detached servo must be 1-4") return True return self.__sendCommand("SD" + str(servo)) # ***** # (H)ousekeeping and debugging commands # ***** def setExceptionOn(self): self.exceptionOn = True return def setExceptionOff(self): self.exceptionOn = False return def setDebugOn(self): self.setLastTime() self.__handleTraceDebug("@setDebugOn") self.debugOn = True return self.__sendCommand("HD1") def setTraceOn(self): self.setLastTime() self.__handleTraceDebug("@setTraceOn") self.traceOn = True self.traceFile = self.file( "arduino.log", "w" ) return self.__sendCommand("HT1") def dumpTrace(self): self.setLastTime() self.__handleTraceDebug("@dumpTrace") if self.__sendCommand("HU" + str(True)) == 0: traceLine = "" while (traceLine != "#EOD#"): traceLine = self.serial.readline().rstrip() self.__handleTraceDebug("Arduino - " + traceLine) def setDebugOff(self): self.setLastTime() self.__handleTraceDebug("@setDebugOff") self.debugOn = False return self.__sendCommand("HD0") def setTraceOff(self): self.setLastTime() self.__handleTraceDebug("@setTraceOff") self.traceOn = False self.traceFile.close() return self.__sendCommand("HT0") def setLastTime(self): if self.traceOn: self.lastTime = time.ctime().split(" ")[3] self.__handleTraceDebug("@setLastTime:" + self.lastTime) return self.__sendCommand("HL" + self.lastTime) def resyncHandshake(self): # Attempt to get back to a good handshake sendingCommand = True retries = self.commRetries while sendingCommand: # Establish a handshake self.serial.write(("%").encode("ISO-8859-1")) if self.serial.read().decode("ISO-8859-1") == "#": break retries = retries - 1 if retries == 0: self.returnDebug = self.__handleTraceDebug("timeout on resyncing handshake with Arduino") self.returnValue = 0 self.returnStatus = True return self.__handleRaiseException() return False def referenceVoltage(self): self.setLastTime() self.__handleTraceDebug("@referenceVoltage") return self.__sendCommand("HV") # ***** # (C)ompass commands # ***** def compassInit(self): self.setLastTime() self.__handleTraceDebug("@compassInit") return self.__sendCommand("CI") def compassBearing(self): self.setLastTime() self.__handleTraceDebug("@compassBearing") return self.__sendCommand("CB") def compassPitch(self): self.setLastTime() self.__handleTraceDebug("@compassPitch") return self.__sendCommand("CP") def compassRoll(self): self.setLastTime() self.__handleTraceDebug("@compassRoll") return self.__sendCommand("CR") # ***** # (L)CD command # ***** def LCD(self, line, message): self.setLastTime() self.__handleTraceDebug("@LCD") message = message[:16] message = message.ljust(16, " ") return self.__sendCommand("CL" + str(line) + message) # ***** # (C)ustom command - LED Blinker # ***** def blinkLED(self, pin=13, blinks=0, duration=0, state=0): self.setLastTime() if pin not in self.board['pwm'] or pin in self.board['disabled']: self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'LED Blinker'" return -1 self.__handleTraceDebug("@blinkLED for pin " + str(pin)) return self.__sendCommand("CX" + str(pin) + "," + str(blinks)) # ------------------------------------------------------------------------ def file(self, filename, option='a'): return open(filename, option) #EOF
Private
[
?
]
Run code
Submit