# -*- coding: utf-8 -*-
#
# Class: USBinterface - Interface between RapsberryPi and Arduino
#
# Version: 0.1
# Creator: meigrafd
# Copyright (C) 2016 by meiraspi@gmail.com published under the MIT License
#
import time
import serial
import logging
class interfaceError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return repr(self.message)
class USBinterfaceClass(object):
"""
Methods implementing an interface between RaspberryPi and an Arduino
"""
# Interface control and information attributes
returnStatus = 0 # Status of call (0=Good, -1=Error)
returnValue = "" # Return value (set as appropriate)
returnDebug = "" # Return message (set as appropriate)
returnLastCall = "" # Last method called from class
initFailed = False # Set to true if our _init_ failed (e.g. serial port issue)
commRetries = 30 # Number of communication retries
commTimeout = 1 # Timeout wait on communication retries
traceFile = "" # Pointer for a trace file
lastTime = "" # Last time stamp shared with the Arduino
debugOn = False # Print error messages
traceOn = False # Log all interactions
exceptionOn = False # Throw exceptions on error
responseRetries = 3 # Number of times to retry from a response mess up
responseRetry = 0 # Current retry
dialogDebug = False # Set to true for debug statements across response
# *****
# Define the pins available for our Arduino board
# *****
board = {
'pins' : tuple(x for x in range(54)),
'digital' : tuple(x for x in range(54)),
'analog' : tuple(x for x in range(16)),
'pwm' : tuple(x for x in range(2,14)),
'disabled' : (0, 1, 14, 15) # Rx, Tx, Crystal
}
# Initialize the serial port on instantiation and close on deletion
def __init__(self, port="/dev/ttyACM0", baudrate=38400):
try:
self.serial = serial.Serial(port, baudrate, timeout=self.commTimeout)
except:
self.initFailed = True
self.returnDebug = "Error opening serial port '" + port + "'"
return self.__handleRaiseException()
print("Initialized USBinterfaceClass")
def __del__(self):
if not self.initFailed and self.serial:
self.serial.close()
def printD(self, line):
if self.dialogDebug:
print(line)
# Internal methods to send a command and receive a response
def __sendCommand(self, command):
# Attempt to get a good handshake and then send the command
sendingCommand = True
retries = self.commRetries
while sendingCommand:
# Establish a handshake
self.serial.write(("%").encode("ISO-8859-1"))
self.printD("sendCommand: sent %")
test = self.serial.read().decode("ISO-8859-1")
if test == "#":
self.printD("sendCommand: got #")
#self.printD("sendCommand: good handshake")
# With a good handshake send the command
self.serial.write((command).encode("ISO-8859-1"))
# Terminate the command
self.serial.write(("@").encode("ISO-8859-1"))
self.printD("sendCommand: sent " + str(command) + "@")
# Done waiting to send the command
sendingCommand = False
retries = retries - 1
if retries == 0:
self.returnDebug = self.__handleTraceDebug("timeout on handshake with Arduino")
self.returnStatus = True
self.returnValue = True
return self.__handleRaiseException()
# With a good handshake try to get a response to our command
retries = self.commRetries
while retries > 0:
# Get the response from the Arduino
response = self.__getResponse(command)
# No response so retry
if response == -999:
self.printD("sendCommand: no response - retry")
retries = retries - 1
# Got a response so back we go
elif response == True or response == False:
self.printD("sendCommand: got response")
self.returnStatus = response
return self.returnValue
retries = retries - 1
self.returnDebug = self.__handleTraceDebug("timeout on command response from Arduino")
self.returnStatus = True
self.returnValue = True
return self.__handleRaiseException()
def __getResponse(self, command):
returnLine = (self.serial.readline().strip()).decode("ISO-8859-1")
# Do some cleanup on the returned line
if returnLine == "":
self.printD("getResponse: blank line")
return -1
returnLine = returnLine.replace("#","")
# First character should be a valid return code, if not we will ignore this line
if returnLine.startswith("0"):
response = False
if returnLine.startswith("1"):
response = True
# Split off the return values if any
returnArgs = returnLine.split(",")
try:
self.returnValue = returnArgs[1].strip()
except IndexError:
self.returnValue = 0
try:
self.returnDebug = returnArgs[2]
except IndexError:
self.returnDebug = ""
try:
self.__handleTraceDebug("returned " + str(response) + " value " + str(self.returnValue) + " " + self.returnDebug)
except:
if self.responseRetry < self.responseRetries:
self.responseRetry += 1
self.printD("getResponse: response retry " + str(responseRetry) + " of " + str(responseRetries))
self.__sendCommand(command)
else:
self.printD("getResponse: giving up on response retries ")
self.responseRetry = 0
return self.__handleRaiseException()
self.printD("getResponse: valid response of " + str(response) + ": " + str(returnLine))
if response:
return self.__handleRaiseException()
else:
return response
# *****
# Raise exception intercepter
# *****
def __handleRaiseException(self):
if self.exceptionOn:
raise interfaceError(self.returnDebug)
else:
return True
# *****
# Trace/debug intercepter
# *****
def __handleTraceDebug(self, message):
if message[0] == "@":
self.returnLastCall = message
if self.debugOn:
print(message)
if self.traceOn:
self.traceFile.write(self.lastTime + " - " + message)
self.traceFile.write("\n")
return message
#-------------------------------------------------------------------
# COMMAND METHODS
#-------------------------------------------------------------------
# *****
# (P)in mode commands
# *****
def pinModeInput(self, pin):
self.setLastTime()
if pin not in self.board['pins']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Mode Input'"
return -1
self.__handleTraceDebug("@pinModeInput for pin " + str(pin))
return self.__sendCommand("PI" + str(pin))
def pinModeOutput(self, pin):
self.setLastTime()
if pin not in self.board['pins']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Mode Output'"
return -1
self.__handleTraceDebug("@pinModeOutput for pin " + str(pin))
return self.__sendCommand("PO" + str(pin))
# *****
# (D)igital commands
# *****
def setHigh(self, pin):
self.setLastTime()
if pin not in self.board['digital']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Set High'"
return -1
self.__handleTraceDebug("@setHigh for pin " + str(pin))
return self.__sendCommand("DH" + str(pin))
def setLow(self, pin):
self.setLastTime()
if pin not in self.board['digital']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Set Low'"
return -1
self.__handleTraceDebug("@setLow for pin " + str(pin))
return self.__sendCommand("DL" + str(pin))
def digitalRead(self, pin):
self.setLastTime()
if pin not in self.board['digital']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Digital Read'"
return -1
self.__handleTraceDebug("@digitalRead for pin " + str(pin))
return self.__sendCommand("DR" + str(pin))
def digitalWrite(self, pin, value):
self.setLastTime()
if pin not in self.board['digital']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Digital Write'"
return -1
self.__handleTraceDebug("@digitalWrite for pin " + str(pin))
return self.__sendCommand("DW" + str(pin) + "," + str(value))
# *****
# (A)nalog commands
# *****
def analogRead(self, pin):
self.setLastTime()
if pin not in self.board['analog'] or pin in self.board['disabled']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Analog Read'"
return -1
self.__handleTraceDebug("@analogRead for pin " + str(pin))
return self.__sendCommand("AR" + str(pin))
# *****
# PWM commands - set as (A)nalog
# *****
def pwmWrite(self, pin, value):
self.setLastTime()
if pin not in self.board['pwm'] or pin in self.board['disabled']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'PWM Write'"
return -1
self.__handleTraceDebug("@pwmWrite for pin " + str(pin) + " value " + str(value))
return self.__sendCommand("AW" + str(pin) + "," + str(value))
def pwmRead(self, pin):
self.setLastTime()
if pin not in self.board['pwm'] or pin in self.board['disabled']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'PWM Read'"
return -1
self.__handleTraceDebug("@pwmRead for pin " + str(pin))
return self.__sendCommand("AR" + str(pin))
def pwmPulseLow(self, pin, trigger):
self.setLastTime()
if pin not in self.board['pwm'] or pin in self.board['disabled']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'PWM Pulse Low'"
return -1
if trigger not in self.board['digital']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for trigger in 'PWM Pulse Low'"
return -1
self.__handleTraceDebug("@pwmPulseLow for pin " + str(pin) + " trigger " + str(trigger))
return self.__sendCommand("AL" + str(pin) + "," + str(trigger))
def pwmPulseHigh(self, pin, trigger):
self.setLastTime()
if pin not in self.board['pwm'] or pin in self.board['disabled']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'PWM Pulse High'"
return -1
if trigger not in self.board['digital']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for trigger in 'PWM Pulse High'"
return -1
self.__handleTraceDebug("@pwmPulseHigh for pin " + str(pin) + " trigger " + str(trigger))
return self.__sendCommand("AH" + str(pin) + "," + str(trigger))
def USping(self, echo, trigger):
self.setLastTime()
if echo not in self.board['pwm'] or echo in self.board['disabled']:
self.returnDebug = "Invalid pin (" + str(echo) + ") specified for 'US ping'"
return -1
if trigger not in self.board['digital']:
self.returnDebug = "Invalid pin (" + str(trigger) + ") specified for trigger in 'US ping'"
return -1
self.__handleTraceDebug("@USping for echo " + str(echo) + " trigger " + str(trigger))
return self.__sendCommand("AP" + str(echo) + "," + str(trigger))
# *****
# (S)ervo commands
# *****
def attachServo(self, servo, pin):
self.setLastTime()
if pin not in self.board['digital']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'Attach Servo'"
return -1
self.__handleTraceDebug("@attachServo " + str(servo) + " pin " + str(pin))
if servo < 0 or servo > 4:
self.returnDebug = self.__handleTraceDebug("Attached servo must be 1-4")
return True
return self.__sendCommand("SA" + str(servo) + "," + str(pin))
def commandServo(self, servo, command):
self.setLastTime()
self.__handleTraceDebug("@commandServo " + str(servo) + " command " + str(command))
if servo < 0 or servo > 4:
self.returnDebug = self.__handleTraceDebug("Commanded servo must be 1-4")
return True
return self.__sendCommand("SC" + str(servo) + "," + str(command))
def detachServo(self, servo):
self.setLastTime()
self.__handleTraceDebug("@detachServo " + str(servo))
if servo < 0 or servo > 4:
self.returnDebug = self.__handleTraceDebug("Detached servo must be 1-4")
return True
return self.__sendCommand("SD" + str(servo))
# *****
# (H)ousekeeping and debugging commands
# *****
def setExceptionOn(self):
self.exceptionOn = True
return
def setExceptionOff(self):
self.exceptionOn = False
return
def setDebugOn(self):
self.setLastTime()
self.__handleTraceDebug("@setDebugOn")
self.debugOn = True
return self.__sendCommand("HD1")
def setTraceOn(self):
self.setLastTime()
self.__handleTraceDebug("@setTraceOn")
self.traceOn = True
self.traceFile = self.file( "arduino.log", "w" )
return self.__sendCommand("HT1")
def dumpTrace(self):
self.setLastTime()
self.__handleTraceDebug("@dumpTrace")
if self.__sendCommand("HU" + str(True)) == 0:
traceLine = ""
while (traceLine != "#EOD#"):
traceLine = self.serial.readline().rstrip()
self.__handleTraceDebug("Arduino - " + traceLine)
def setDebugOff(self):
self.setLastTime()
self.__handleTraceDebug("@setDebugOff")
self.debugOn = False
return self.__sendCommand("HD0")
def setTraceOff(self):
self.setLastTime()
self.__handleTraceDebug("@setTraceOff")
self.traceOn = False
self.traceFile.close()
return self.__sendCommand("HT0")
def setLastTime(self):
if self.traceOn:
self.lastTime = time.ctime().split(" ")[3]
self.__handleTraceDebug("@setLastTime:" + self.lastTime)
return self.__sendCommand("HL" + self.lastTime)
def resyncHandshake(self):
# Attempt to get back to a good handshake
sendingCommand = True
retries = self.commRetries
while sendingCommand:
# Establish a handshake
self.serial.write(("%").encode("ISO-8859-1"))
if self.serial.read().decode("ISO-8859-1") == "#":
break
retries = retries - 1
if retries == 0:
self.returnDebug = self.__handleTraceDebug("timeout on resyncing handshake with Arduino")
self.returnValue = 0
self.returnStatus = True
return self.__handleRaiseException()
return False
def referenceVoltage(self):
self.setLastTime()
self.__handleTraceDebug("@referenceVoltage")
return self.__sendCommand("HV")
# *****
# (C)ompass commands
# *****
def compassInit(self):
self.setLastTime()
self.__handleTraceDebug("@compassInit")
return self.__sendCommand("CI")
def compassBearing(self):
self.setLastTime()
self.__handleTraceDebug("@compassBearing")
return self.__sendCommand("CB")
def compassPitch(self):
self.setLastTime()
self.__handleTraceDebug("@compassPitch")
return self.__sendCommand("CP")
def compassRoll(self):
self.setLastTime()
self.__handleTraceDebug("@compassRoll")
return self.__sendCommand("CR")
# *****
# (L)CD command
# *****
def LCD(self, line, message):
self.setLastTime()
self.__handleTraceDebug("@LCD")
message = message[:16]
message = message.ljust(16, " ")
return self.__sendCommand("CL" + str(line) + message)
# *****
# (C)ustom command - LED Blinker
# *****
def blinkLED(self, pin=13, blinks=0, duration=0, state=0):
self.setLastTime()
if pin not in self.board['pwm'] or pin in self.board['disabled']:
self.returnDebug = "Invalid pin (" + str(pin) + ") specified for 'LED Blinker'"
return -1
self.__handleTraceDebug("@blinkLED for pin " + str(pin))
return self.__sendCommand("CX" + str(pin) + "," + str(blinks))
# ------------------------------------------------------------------------
def file(self, filename, option='a'):
return open(filename, option)
#EOF