contiki/tools/stm32w/stm32w_flasher/py_files/rs232_interface.py

1056 lines
34 KiB
Python

# See comment in stm32w_flasher.py.
# Extraction and little adaptation performed by E.Duble (CNRS, LIG).
import ftdi
import os
import pyudev
import serial
import struct
import subprocess
import sys
import time
import ymodem
from messages import errorMessage
from file_utils import fileFormatReader
from messages import infoMessage
from messages import warningMessage
ACK = 121
BOOTLOADER_COMMAND_BAUDRATE = 50
CBUS_IOMODE = 10
FT232R_PRODUCT = 24577
FT232R_VENDOR = 1027
NAK = 31
STM32F103_PRODUCT = 22337
STM32F103_PRODUCT_OLD = 22336
STM32F103_VENDOR = 1155
StringTypes = None
TIMEOUT_TO_SWITCH_BETWEEN_APPLICATION_AND_BOOTLOADER = 10
commands = {'WRITE': [49, 3], 'GO': [33, 3], 'GET_ID': [2, 5], 'ERASE': [67, 2], 'GET': [0, 2], 'READ': [17, 259], 'WRITE_INCREMENTAL': [54, 2], 'GET_VERSION': [1, 5], 'READOUT_PROTECT': [130, 2], 'READOUT_UNPROTECT': [146, 2]}
pcIfHandle = None
rs232PortsList = None
class FTDI_Interface(object):
def __init__(self, port):
self.port = port
self.resetValue = None
self.nBootValue = None
def close(self):
return None
def getFTDIIdFromComPort(self, port):
returnValue = None
deviceIDs = None
mapping = self.mapComPortsToFTDIId()
if deviceIDs == None:
returnValue = (mapping[port])
else:
for id in deviceIDs:
if (mapping[port]) == id:
returnValue = id
break
else:
continue
if returnValue is None:
for id in deviceIDs:
if (mapping[port]).upper() == id.upper():
returnValue = id
break
else:
continue
return returnValue
def getFTDIIdFromDevPort(self, port):
device = pyudev.Device.from_device_file(pyudev.Context(), port)
return device['ID_SERIAL']
def guessResetDirection(self, h):
if (self.resetValue is None or self.nBootValue is None):
CBUS_2 = 4
CBUS_0 = 1
RESET_LINE = CBUS_2
GPIO5_LINE = CBUS_0
resetLow = ((RESET_LINE << 4) | 0)
resetHigh = ((RESET_LINE << 4) | RESET_LINE)
GPIO5Low = ((GPIO5_LINE << 4) | 0)
GPIO5High = ((GPIO5_LINE << 4) | GPIO5_LINE)
h.usb_write_timeout = 1000
h.usb_read_timeout = 1000
ret = ftdi.ftdi_set_baudrate(h, 115200)
ret = (ret + ftdi.ftdi_setflowctrl(h, ftdi.SIO_DISABLE_FLOW_CTRL))
ret = (ret + ftdi.ftdi_set_line_property(h, ftdi.BITS_8, ftdi.STOP_BIT_1, ftdi.NONE))
for i in range(2):
if (i % 2) == 0:
resetValue = resetHigh
nbootValue = GPIO5High
else:
resetValue = resetLow
nbootValue = GPIO5Low
ftdi.ftdi_set_bitmode(h, resetValue, ftdi.BITMODE_CBUS)
time.sleep(0.10000000000000001)
ftdi.ftdi_set_bitmode(h, nbootValue, ftdi.BITMODE_CBUS)
time.sleep(0.10000000000000001)
ftdi.ftdi_set_bitmode(h, 0, ftdi.BITMODE_CBUS)
ftdi.ftdi_usb_purge_rx_buffer(h)
ftdi.ftdi_usb_purge_tx_buffer(h)
ftdi.ftdi_write_data(h, struct.pack('B', 127), 1)
startTime = time.time()
inbuff = '\x00'
nbyte_rcvd = 0
while (nbyte_rcvd == 0 and (time.time() - startTime) < 1):
nbyte_rcvd = ftdi.ftdi_read_data(h, inbuff, 1)
continue
if nbyte_rcvd > 0:
reply = struct.unpack('B', inbuff)
if (reply[0]) == 121:
self.resetValue = resetValue
self.nBootValue = nbootValue
break
else:
continue
continue
return (self.resetValue, self.nBootValue)
def open(self):
return 0
def reset(self, bootloader=False):
handle = ftdi.ftdi_context()
if ftdi.ftdi_init(handle) < 0:
print 'Initialization error.',
print
sys.exit(-1)
serialnum = self.getFTDIIdFromDevPort(self.port)
if subprocess.call(['rmmod', 'ftdi_sio']) != 0:
print 'Close all processes that may access serial ports. You may also need to run the program as root.',
print
sys.exit(-1)
error = ftdi.ftdi_usb_open_desc(handle, FT232R_VENDOR, FT232R_PRODUCT, None, serialnum)
if error < 0:
print ('Unable to open port. Error:' + str(error)),
print
subprocess.call(['modprobe', 'ftdi_sio'])
sys.exit(-1)
item0, item1 = self.guessResetDirection(handle)
resetValue = item0
nBootValue = item1
if (resetValue, nBootValue) == (None, None):
ftdi.ftdi_usb_close(handle)
subprocess.call(['modprobe', 'ftdi_sio'])
return 1
else:
ftdi.ftdi_set_bitmode(handle, resetValue, ftdi.BITMODE_CBUS)
time.sleep(0.5)
if bootloader:
ftdi.ftdi_set_bitmode(handle, nBootValue, ftdi.BITMODE_CBUS)
time.sleep(0.10000000000000001)
ftdi.ftdi_set_bitmode(handle, 0, ftdi.BITMODE_CBUS)
ftdi.ftdi_usb_close(handle)
subprocess.call(['modprobe', 'ftdi_sio'])
time.sleep(1)
return None
def setNBOOT(self, value=1):
returnValue = True
return returnValue
class STBL(object):
def __init__(self, serialPort, inputFile=None, startAddress=None, verboseMode=None, packetSize=256, serialMode=True):
self.serialPort = serialPort
self.inputFile = inputFile
self.startAddress = startAddress
self.verbose = verboseMode
self.packetSize = packetSize
self.serialMode = serialMode
def bootloaderInit(self):
returnValue = True
if (returnValue and self.serialMode):
reply = None
if self.verbose:
infoMessage('Sending byte 7f\n')
self.serialPort.write(struct.pack('B', 127))
startTime = time.time()
while (self.serialPort.inWaiting() == 0 and (time.time() - startTime) < 1):
continue
r = self.serialPort.read(1)
if r:
reply = struct.unpack(('B' * len(r)), r)
else:
reply = None
if self.verbose:
infoMessage(("Reply received '%s'\n" % hexString(reply)))
if reply == None:
errorMessage('No reply received\n')
returnValue = False
else:
if (reply[0]) != ACK:
errorMessage(('Unexpected reply %s\n' % hexString(reply)))
returnValue = False
if returnValue:
reply = self.sendCommand('GET', [], 2)
if (len(reply) < 4 or ((reply[0]) != ACK or (reply[-1]) != ACK)):
errorMessage(('Unexpected reply %s\n' % hexString(reply)))
returnValue = False
else:
if self.verbose:
infoMessage(('Bootloader version %d.%d\n' % (((reply[2]) & 15), ((reply[2]) >> 4))))
if returnValue:
reply = self.sendCommand('GET_ID')
if (len(reply) == 0 or ((reply[0]) != ACK or (reply[-1]) != ACK)):
errorMessage(('Unexpected reply %s\n' % hexString(reply)))
returnValue = False
else:
if self.verbose:
infoMessage((('Product ID = 0x' + ''.join(('%02X' % a) for a in reply[2:-1])) + '\n'))
return returnValue
def checksum(self, packet):
if len(packet) > 1:
checksum = 0
for p in packet:
checksum = (checksum ^ p)
continue
else:
checksum = ((~(packet[0])) & 255)
return checksum
def enableReadProtection(self, enable):
returnValue = True
if enable == False:
reply = self.sendCommand('READOUT_UNPROTECT')
if reply != [ACK, ACK]:
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
returnValue = False
else:
item0, item1 = self.memoryRead(134481920, 512)
returnValue = item0
cibData = item1
if returnValue:
reply = self.sendCommand('ERASE', [[254]], timeout=5)
if reply != [ACK, ACK]:
errorMessage(('Unexpected reply %s\n' % repr(reply)))
returnValue = False
if returnValue:
arg1 = u32ToArray(134481922)
arg1.reverse()
packet = cibData[2:256]
packet = ([(len(packet) - 1)] + packet)
reply = self.sendCommand('WRITE', [arg1, packet], 5)
if reply != [ACK, ACK, ACK]:
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
returnValue = False
if returnValue:
arg1 = u32ToArray(134482176)
arg1.reverse()
packet = cibData[256:]
packet = ([(len(packet) - 1)] + packet)
reply = self.sendCommand('WRITE', [arg1, packet], 5)
if reply != [ACK, ACK, ACK]:
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
returnValue = False
if returnValue:
reply = self.sendCommand('READOUT_PROTECT')
if reply != [ACK, ACK]:
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
returnValue = False
return returnValue
def eraseUserFlash(self):
returnValue = True
reply = self.sendCommand('ERASE', [[255]], timeout=5)
if reply != [ACK, ACK]:
errorMessage(('Unexpected reply %s\n' % repr(reply)))
returnValue = False
return returnValue
def isReadProtectionActive(self):
returnValue, memread = self.memoryRead(134479872, 4)
return (not returnValue)
def loadFile(self, inputFile=None, startAddress=None, progressReport=None, doErase=True):
if inputFile is not None:
self.inputFile = inputFile
if startAddress is not None:
self.startAddress = startAddress
returnValue = True
f = fileFormatReader(self.inputFile, self.startAddress)
try:
item0, item1 = f.getRawBinary()
self.startAddress = item0
file_content = item1
except IOError:
errorMessage((('File ' + self.inputFile) + ' open failed\n'))
returnValue = False
if returnValue:
file_size = len(file_content)
packet = []
address = self.startAddress
pages = int(((file_size + 1023) / 1024))
startPage = (((self.startAddress & 4294966272L) - 134217728) / 1024)
eraseArg = ([(pages - 1)] + range(startPage, (startPage + pages), 1))
infoMessage(('Erasing pages from %d to %d...' % (startPage, ((startPage + pages) - 1))))
if ('STM32W_FLASHER_JLINK_DONT_ERASE' in os.environ or (not doErase)):
infoMessage('(skipped)', False)
else:
reply = self.sendCommand('ERASE', [eraseArg], timeout=10)
if reply != [ACK, ACK]:
errorMessage(('Unexpected reply %s\n' % repr(reply)))
returnValue = False
if returnValue:
infoMessage('done\n', False)
size = 0
while returnValue:
packet = []
packet_size = self.packetSize
packet_string = file_content[size:(size + packet_size)]
packet_size = len(packet_string)
if packet_size == 0:
infoMessage('\n')
break
else:
size = (size + packet_size)
packet.extend(packet_string)
if (len(packet) % 2) != 0:
packet = (packet + [255])
packet = ([(len(packet) - 1)] + packet)
if progressReport:
progressReport(size, file_size)
else:
infoMessage(('Programming %05d/%05d\r' % (size, file_size)))
arg1 = u32ToArray(address)
arg1.reverse()
if (self.serialMode or size == packet_size):
reply = self.sendCommand('WRITE', [arg1, packet])
if reply != [ACK, ACK, ACK]:
errorMessage(('\n\n+Unexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
returnValue = False
else:
retries = 0
MAX_RETRIES = 3
while returnValue:
reply = self.sendCommand('WRITE_INCREMENTAL', [packet])
if reply != [ACK, ACK]:
retries = (retries + 1)
if retries > MAX_RETRIES:
errorMessage(('\n\nUnexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
returnValue = False
break
else:
errorMessage(('\n\nUnexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
infoMessage('Retrying...\n')
continue
else:
break
address = (address + packet_size)
continue
if returnValue:
if progressReport == None:
infoMessage('Done\n')
return returnValue
def memoryRead(self, address, size):
returnValue = True
currentSize = 0
memRead = []
while (returnValue and currentSize < size):
packet_size = min(self.packetSize, (size - currentSize))
arg1 = u32ToArray((address + currentSize))
arg1.reverse()
reply = self.sendCommand('READ', [arg1, [(packet_size - 1)]])
if reply == [NAK]:
returnValue = False
else:
if reply[:3] != [ACK, ACK, ACK]:
errorMessage(('\n\nXXUnexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
returnValue = False
else:
memRead = (memRead + reply[3:])
currentSize = (currentSize + packet_size)
continue
return (returnValue, memRead)
def programCibData(self, cibData):
returnValue = True
if returnValue:
reply = self.sendCommand('ERASE', [[254]], timeout=5)
if reply != [ACK, ACK]:
errorMessage(('Unexpected reply %s\n' % repr(reply)))
returnValue = False
if returnValue:
arg1 = u32ToArray(134481920)
arg1.reverse()
packet = cibData[:256]
packet = ([(len(packet) - 1)] + packet)
reply = self.sendCommand('WRITE', [arg1, packet], 5)
if reply != [ACK, ACK, ACK]:
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
returnValue = False
if returnValue:
arg1 = u32ToArray(134482176)
arg1.reverse()
packet = cibData[256:]
packet = ([(len(packet) - 1)] + packet)
reply = self.sendCommand('WRITE', [arg1, packet], 5)
if reply != [ACK, ACK, ACK]:
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
returnValue = False
return returnValue
def programUserFlash(self, inputFile, startAddress=134217728, progressReport=None, doErase=True):
return self.loadFile(inputFile, startAddress, progressReport, doErase)
def sendCommand(self, command, args=[], timeout=2, traceCommands=False):
def timedRead(port, timeout, serialMode, bytes=1):
reply = []
r = ''
for byte in range(bytes):
startTime = time.time()
r1 = ''
while (time.time() - startTime) < timeout:
r1 = port.read(1)
if len(r1) > 0:
r = (r + r1)
startTime = time.time()
break
else:
if serialMode:
continue
else:
time.sleep(0.001)
continue
if len(r1) == 0:
break
else:
continue
if len(r) > 0:
reply.extend(struct.unpack(('B' * len(r)), r))
return reply
reply = []
error = False
if command in commands:
item0, item1 = (commands[command])
commandID = item0
replyLength = item1
if command == 'READ':
replyLength = (((args[1])[0]) + 4)
else:
error = True
if error:
pass
else:
if traceCommands:
infoMessage(('Sending command: %02x %02x\n' % (commandID, (255 - commandID))))
self.serialPort.write(struct.pack('BB', commandID, (255 - commandID)))
r = timedRead(self.serialPort, timeout, self.serialMode, 1)
reply = (reply + r)
if (len(r) != 1 or (r[0]) != ACK):
error = True
if error:
pass
else:
for arg in args:
arg = (arg + [self.checksum(arg)])
if traceCommands:
infoMessage((('Sending arg:' + ','.join(('%02x' % a) for a in arg)) + '\n'))
self.serialPort.write(struct.pack(('B' * len(arg)), *arg))
r = timedRead(self.serialPort, timeout, self.serialMode, 1)
reply = (reply + r)
if (len(r) != 1 or (r[0]) != ACK):
error = True
break
else:
continue
if ((not error) and len(reply) < replyLength):
reply = (reply + timedRead(self.serialPort, timeout, self.serialMode, (replyLength - len(reply))))
if (command == 'GET' and len(reply) == replyLength):
reply = (reply + timedRead(self.serialPort, timeout, self.serialMode, ((reply[1]) + 2)))
if traceCommands:
infoMessage(('Reply was %s\n' % hexString(reply)))
return reply
def setPort(self, serialPort):
self.serialPort = serialPort
def startApplication(self, startAddress=134217728):
returnValue = True
if (self.startAddress is not None and startAddress != 0):
startAddress = self.startAddress
arg1 = u32ToArray(startAddress)
arg1.reverse()
reply = self.sendCommand('GO', [arg1])
if reply != [ACK, ACK, ACK]:
errorMessage(('\n\nUnexpected reply %s\n' % repr(reply)))
returnValue = False
return returnValue
def verifyFile(self, inputFile=None, startAddress=None, progressReport=None):
if inputFile is not None:
self.inputFile = inputFile
if startAddress is not None:
self.startAddress = startAddress
returnValue = True
f = fileFormatReader(self.inputFile, self.startAddress)
try:
item0, item1 = f.getRawBinary()
self.startAddress = item0
file_content = item1
except IOError:
errorMessage((('File ' + self.inputFile) + ' open failed\n'))
returnValue = False
if returnValue:
file_size = len(file_content)
packet = []
address = self.startAddress
size = 0
errors = 0
while returnValue:
packet = []
packet_size = self.packetSize
packet_string = file_content[size:(size + packet_size)]
packet_size = len(packet_string)
if packet_size == 0:
infoMessage('\n')
break
else:
size = (size + packet_size)
packet.extend(packet_string)
if progressReport:
progressReport(size, file_size)
else:
infoMessage(('Verifying %05d/%05d\r' % (size, file_size)))
arg1 = u32ToArray(address)
arg1.reverse()
reply = self.sendCommand('READ', [arg1, [(len(packet) - 1)]])
if reply[:3] != [ACK, ACK, ACK]:
errorMessage(('\n\nUnexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
returnValue = False
if len(reply[3:]) != len(packet):
errorMessage(('Invalid data read, expected length = %d, received bytes = %d\n' % (len(packet), len(reply[3:]))))
returnValue = False
if (returnValue and reply[3:] != packet):
returnValue = False
infoMessage('Verify failed \n')
infoMessage(('%-8s: %5s %5s\n' % ('Addr', 'Flash', 'file')))
for i in range(len(packet)):
if (reply[(3 + i)]) != (packet[i]):
infoMessage(('%08x: %02x %02x\n' % ((address + i), (reply[(i + 3)]), (packet[i]))))
errors = (errors + 1)
if errors > 64:
break
else:
continue
else:
continue
address = (address + packet_size)
continue
if returnValue:
if progressReport == None:
infoMessage('Done\n')
return returnValue
def verifyFlash(self, inputFile, startAddress=134217728, progressReport=None):
return self.verifyFile(inputFile, startAddress, progressReport)
class STM32F_Interface(object):
APP_RUNNING = 1
BL_RUNNING = 0
DOWNLOAD_BL_IMAGE = 10
DOWNLOAD_IMAGE = 5
FAIL = 0
FIRMWARE_VERSION_NONE = 4294967295L
GET_APP_VERSION = 3
GET_BL_VERSION = 9
GET_CODE_TYPE = 2
IS_APP_PRESENT = 4
IS_BL_VERSION_OLD = 11
OK = 1
REPLY_START_BYTE = 187
RUN_APPLICATION = 6
RUN_BOOTLOADER = 7
SET_nBOOTMODE = 1
SET_nRESET = 0
START_BYTE = 170
STOP_BYTE = 85
UNKNOWN_COMMAND = 15
UPLOAD_IMAGE = 8
replyLength = [4, 4, 4, 7, 4, 4, 4, 4, 4, 7, 4, 4]
def __init__(self, port, firmwareName):
self.port = port
self.bootloaderPort = port
self.firmwareName = firmwareName
self.serialPort = None
def close(self):
if self.serialPort:
self.serialPort.close()
def getBootloaderFirmwareVersion(self):
returnValue = self.FIRMWARE_VERSION_NONE
reply = self.sendCommand(self.GET_BL_VERSION)
if reply:
returnValue = ((((reply[2]) + ((reply[3]) << 8)) + ((reply[4]) << 16)) + ((reply[5]) << 24))
return returnValue
def getFirmwareType(self):
returnValue = None
reply = self.sendCommand(self.GET_CODE_TYPE)
if reply:
returnValue = (reply[2])
return returnValue
def getFirmwareVersion(self):
returnValue = self.FIRMWARE_VERSION_NONE
reply = self.sendCommand(self.GET_APP_VERSION)
if reply:
returnValue = ((((reply[2]) + ((reply[3]) << 8)) + ((reply[4]) << 16)) + ((reply[5]) << 24))
return returnValue
def getFirmwareVersionFromFile(self, filename=None, stringMode=False):
version = None
if filename is None:
filename = self.firmwareName
f = open(filename)
f.seek(28, 0)
tag = f.read(4)
versionValue = f.read(4)
versionValue = struct.unpack(('B' * len(versionValue)), versionValue)
f.close()
if tag == '\xaaU\xaaU':
version = ((((versionValue[0]) + ((versionValue[1]) << 8)) + ((versionValue[2]) << 16)) + ((versionValue[3]) << 24))
if stringMode:
version = self.versionInStringFormat(version)
return version
def isBootloaderFirmwareVersionOld(self):
returnValue = False
reply = self.sendCommand(self.IS_BL_VERSION_OLD)
if reply:
returnValue = (reply[2]) == self.FAIL
return returnValue
def isFirmwarePresent(self):
returnValue = None
reply = self.sendCommand(self.IS_APP_PRESENT)
if reply:
returnValue = (reply[2])
return returnValue
def isSTMicroelectronics(self, port):
return port in self.getSTMPorts()
def mapSTMCompositeComPortToBootloaderCOMPort(self, port):
warningMessage('The following procedure may fail if there are other attached USB devices.\nIn this case, try to unplug all the devices and insert only the one to be programmed, then run the command again.\n')
return port
def open(self):
error = 0
try:
self.serialPort = serial.Serial(port=self.port, baudrate=BOOTLOADER_COMMAND_BAUDRATE, timeout=1)
time.sleep(0.10000000000000001)
self.serialPort.flushInput()
except:
errorMessage((('Trouble opening port : ' + repr(self.port)) + '\n'))
error = 1
return error
def reset(self, bootloader=False):
returnValue = True
if returnValue:
reply = self.sendCommand(self.SET_nBOOTMODE, [1])
returnValue = reply != None
if returnValue:
reply = self.sendCommand(self.SET_nRESET, [0])
returnValue = reply != None
if returnValue:
if bootloader:
reply = self.sendCommand(self.SET_nBOOTMODE, [0])
returnValue = reply != None
time.sleep(0.10000000000000001)
if returnValue:
reply = self.sendCommand(self.SET_nRESET, [1])
returnValue = reply != None
time.sleep(0.10000000000000001)
time.sleep(0.10000000000000001)
return returnValue
def runBootloader(self):
returnValue = None
reply = self.sendCommand(self.RUN_BOOTLOADER, [], True)
if reply:
commandReturnValue = (reply[2])
time.sleep(TIMEOUT_TO_SWITCH_BETWEEN_APPLICATION_AND_BOOTLOADER)
for i in range(5):
try:
self.serialPort = serial.Serial(port=self.bootloaderPort, baudrate=BOOTLOADER_COMMAND_BAUDRATE, timeout=1)
self.serialPort = checkSerialPort(self.bootloaderPort, self.serialPort)
time.sleep(0.5)
self.serialPort.flushInput()
if self.serialPort is None:
returnValue = None
else:
returnValue = commandReturnValue
break
except Exception, inst:
errorMessage((' '.join(repr(a) for a in inst.args) + '\n'))
time.sleep(0.5)
continue
return returnValue
def runFirmware(self):
returnValue = None
reply = self.sendCommand(self.RUN_APPLICATION, [], True)
if reply:
commandReturnValue = (reply[2])
time.sleep(TIMEOUT_TO_SWITCH_BETWEEN_APPLICATION_AND_BOOTLOADER)
for i in range(5):
try:
self.serialPort = serial.Serial(port=self.port, baudrate=BOOTLOADER_COMMAND_BAUDRATE, timeout=1)
self.serialPort = checkSerialPort(self.port, self.serialPort)
if self.serialPort is None:
returnValue = None
else:
returnValue = commandReturnValue
break
except Exception, inst:
errorMessage((' '.join(repr(a) for a in inst.args) + '\n'))
time.sleep(0.5)
continue
return returnValue
def sendCommand(self, command, args=[], closePort=False):
verbose = False
returnValue = None
if command == self.DOWNLOAD_IMAGE:
filename = (args[0])
args = []
replyLength = self.replyLength
packet = (([self.START_BYTE, (1 + len(args)), command] + args) + [self.STOP_BYTE])
if verbose:
print '==>',
print ':'.join(('%02X' % a) for a in packet),
print
packedCommand = struct.pack(('B' * len(packet)), *packet)
self.serialPort.write(packedCommand)
if command == self.DOWNLOAD_IMAGE:
yModem = ymodem.Ymodem(self.serialPort, None)
if yModem.loadFile(filename):
returnValue = None
r = self.serialPort.read((replyLength[command]))
if closePort:
self.serialPort.close()
if len(r) == (replyLength[command]):
reply = struct.unpack(('B' * len(r)), r)
if ((reply[0]) == self.REPLY_START_BYTE and ((reply[1]) == ((replyLength[command]) - 3) and (reply[-1]) == self.STOP_BYTE)):
returnValue = reply
if verbose:
if r:
reply = struct.unpack(('B' * len(r)), r)
print '<==',
print ':'.join(('%02X' % a) for a in reply),
print
else:
print '<==',
print
return returnValue
def setNBOOT(self, value=1):
returnValue = True
if returnValue:
reply = self.sendCommand(self.SET_nBOOTMODE, [value])
returnValue = reply != None
return returnValue
def upgradeBootloaderFirmwareVersion(self):
returnValue = True
reply = self.sendCommand(self.DOWNLOAD_BL_IMAGE)
if reply:
returnValue = (reply[2]) == self.OK
else:
returnValue = False
return returnValue
def upgradeFirmwareVersion(self, filename=None):
returnValue = True
if filename is None:
filename = self.firmwareName
if returnValue:
firmwareType = self.getFirmwareType()
if firmwareType == self.APP_RUNNING:
reply = self.runBootloader()
if reply is None:
returnValue = False
else:
if firmwareType == self.BL_RUNNING:
pass
else:
returnValue = False
if returnValue:
if self.getFirmwareType() != self.BL_RUNNING:
errorMessage('Unable to read from port, please use a USB HUB 1.1 and rerun the command\n')
returnValue = False
else:
reply = self.sendCommand(self.DOWNLOAD_IMAGE, [filename])
returnValue = (reply[2]) == self.OK
return returnValue
def versionInStringFormat(self, version):
return ('%d.%d.%d' % (((version >> 16) & 255), ((version >> 8) & 255), (version & 255)))
class rs232Interface(object):
def __init__(self, port, noReset=False, rfMode=False, eui64=0):
self.port = port
self.noReset = noReset
self.rfMode = rfMode
self.eui64 = eui64
self.serialPort = None
self.portType = portType(port)
if self.portType == 'FTDI':
self.portHandle = FTDI_Interface(port)
else:
if self.portType == 'STM32':
firmwareName = 'CompositeForSTM32W.bin'
if 'STM32W_FLASHER_FORCE_FIRMWARE_NAME' in os.environ:
firmwareName = (os.environ['STM32W_FLASHER_FORCE_FIRMWARE_NAME'])
flasher_py_files = os.path.dirname(os.path.abspath(sys.argv[0]))
firmware_dir = os.path.dirname(flasher_py_files) # parent directory
self.portHandle = STM32F_Interface(port, os.path.join(firmware_dir, firmwareName))
def closePort(self):
self.serialPort.close()
def enableReadProtection(self, flag):
return self.STBL.enableReadProtection(flag)
def eraseUserFlash(self):
return self.STBL.eraseUserFlash()
def init(self, checkFirmwareImage=True):
error = 0
if self.noReset:
if self.rfMode:
error = self.reset(False)
time.sleep(0.10000000000000001)
else:
error = 0
else:
error = self.reset(True, checkFirmwareImage)
if error == 1:
errorMessage((('Trouble while resetting board on port : ' + repr(self.port)) + '\n'))
if error == 0:
error = self.openPort()
if error == 0:
if self.rfMode:
self.packetSize = 96
def sendBinary(port, data, length):
for i in range(length):
port.write(chr(((data >> (i * 8)) & 255)))
continue
sendBinary(self.serialPort, self.eui64, 8)
sendBinary(self.serialPort, 45067, 2)
sendBinary(self.serialPort, 15, 1)
time.sleep(0.5)
while self.serialPort.inWaiting() > 0:
sys.stdout.write(self.serialPort.read(self.serialPort.inWaiting()))
continue
else:
self.packetSize = 256
self.STBL = STBL(self.serialPort, packetSize=self.packetSize, serialMode=(not self.rfMode))
if self.STBL is not None:
tmp = self.STBL.bootloaderInit()
if tmp:
pass
else:
error = 1
return error
def isReadProtectionActive(self):
return self.STBL.isReadProtectionActive()
def memoryRead(self, address, size):
return self.STBL.memoryRead(address, size)
def openPort(self):
error = 0
try:
self.serialPort = serial.Serial(port=self.port, bytesize=8, baudrate=115200, parity='N', timeout=0)
except:
errorMessage((('Trouble opening port : ' + repr(self.port)) + '\n'))
error = 1
return error
def programCibData(self, cibData):
return self.STBL.programCibData(cibData)
def programUserFlash(self, inputFile, startAddress=134217728, progressReport=None, doErase=True):
return self.STBL.programUserFlash(inputFile, startAddress, progressReport, doErase)
def reset(self, bootloader=False, checkSTM32FFirmware=True):
returnValue = 0
if self.portType == 'FTDI':
self.portHandle.reset(bootloader)
else:
if self.portType == 'STM32':
returnValue = self.portHandle.open()
if checkSTM32FFirmware:
if returnValue == 0:
firmwareVersionCurrent = self.portHandle.getFirmwareVersionFromFile(stringMode=False)
if returnValue == 0:
firmwareType = self.portHandle.getFirmwareType()
if (firmwareType is None or (firmwareType != self.portHandle.BL_RUNNING and firmwareType != self.portHandle.APP_RUNNING)):
errorMessage('Failed to get firmware type: Invalid reply\n')
returnValue = 1
if returnValue == 0:
firmwareVersion = self.portHandle.getFirmwareVersion()
if firmwareVersion is None:
errorMessage('Failed to get firmware version:Invalid reply\n')
returnValue = 1
else:
if (firmwareVersion & 4278190080L):
firmwareVersion = self.portHandle.FIRMWARE_VERSION_NONE
if returnValue == 0:
bootloaderFirmwareVersion = self.portHandle.getBootloaderFirmwareVersion()
if bootloaderFirmwareVersion is None:
errorMessage('Failed to get bootloader firmware version:Invalid reply\n')
returnValue = 1
if bootloaderFirmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE:
self.portHandle.bootloaderPort = self.portHandle.mapSTMCompositeComPortToBootloaderCOMPort(self.portHandle.port)
if returnValue == 0:
if (firmwareType == self.portHandle.BL_RUNNING and (firmwareVersion != self.portHandle.FIRMWARE_VERSION_NONE and firmwareVersion >= firmwareVersionCurrent)):
if bootloaderFirmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE:
errorMessage('Please unplug and replug the board to the PC\n')
returnValue = 2
else:
result = self.portHandle.runFirmware()
if result is not self.portHandle.OK:
errorMessage('Failed to run firmware: Invalid reply\n')
returnValue = 1
if returnValue == 0:
if (firmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE or firmwareVersion < firmwareVersionCurrent):
if firmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE:
infoMessage(('Missing STM32F103 firmware, upgrading to version %s\n' % self.portHandle.getFirmwareVersionFromFile(stringMode=True)))
else:
infoMessage(('Old STM32F103 firmware version %s, upgrading to version %s\n' % (self.portHandle.versionInStringFormat(firmwareVersion), self.portHandle.getFirmwareVersionFromFile(stringMode=True))))
warningMessage('Changing firmware version can break backward compatibility and older version of stm32w_flasher may not work\n')
if self.portHandle.upgradeFirmwareVersion():
if ((firmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE or firmwareVersion < 131072) and firmwareVersionCurrent >= 131072):
errorMessage(('Switching to firmware %s may change your COM port number, please unplug and replug your USB device and re-run this command again\n' % self.portHandle.getFirmwareVersionFromFile(stringMode=True)))
returnValue = 2
else:
result = self.portHandle.runFirmware()
if result is not self.portHandle.OK:
errorMessage('Failed to run firmware: Invalid reply\n')
returnValue = 1
else:
errorMessage('Failed to upgrade firmware version: Invalid reply\n')
returnValue = 1
if returnValue == 0:
firmwareVersion = self.portHandle.getFirmwareVersion()
if firmwareVersion is None:
errorMessage('Invalid reply\n')
returnValue = 1
if returnValue == 0:
if firmwareVersion >= 131076:
if self.portHandle.isBootloaderFirmwareVersionOld():
infoMessage('Upgrading STM32F Bootloader firmware\n')
if self.portHandle.upgradeBootloaderFirmwareVersion():
pass
else:
errorMessage('Upgrade of the STM32F bootloader failed. This is a CRITICAL error and your board may brick your board\n')
returnValue = 1
if returnValue == 0:
if firmwareVersion > firmwareVersionCurrent:
warningMessage('Device firmware is more recent than expected. Are you using an old version of this software ?\n')
if returnValue == 0:
result = self.portHandle.reset(bootloader)
if result is None:
errorMessage('Failed to reset STM32W: Invalid reply\n')
returnValue = 1
self.portHandle.close()
else:
errorMessage(('Failed to detect port type for %s\n' % self.port))
returnValue = 1
return returnValue
def setNBOOT(self, value):
returnValue = 0
if (self.portHandle and self.portType == 'STM32'):
self.portHandle.open()
returnValue = self.portHandle.setNBOOT(value)
self.portHandle.close()
return returnValue
def startApplication(self, address=134217728):
if self.noReset is False:
self.closePort()
self.setNBOOT(1)
self.openPort()
self.STBL.setPort(self.serialPort)
return self.STBL.startApplication(address)
def terminate(self):
if self.noReset is False:
self.closePort()
self.setNBOOT(1)
self.openPort()
def verifyFlash(self, inputFile, startAddress, progressReport):
return self.STBL.verifyFlash(inputFile, startAddress, progressReport)
def checkSerialPort(portName, serialPort):
returnValue = serialPort
try:
serialPort.write(' ')
except serial.serialutil.SerialException, inst:
errorMessage((' '.join(repr(a) for a in inst.args) + '\n'))
infoMessage('Write to serial port failed: please try a different USB port or a USB hub 1.1\n')
returnValue = None
return returnValue
def getAvailableSerialPorts(refreshList=True):
global rs232PortsList
if (refreshList or rs232PortsList is None):
returnValue = {}
context = pyudev.Context()
id = [{ 'vendor': FT232R_VENDOR , 'product': FT232R_PRODUCT , 'type': 'FTDI' }, { 'vendor': STM32F103_VENDOR , 'product': STM32F103_PRODUCT , 'type': 'STM32' }, { 'vendor': STM32F103_VENDOR , 'product': STM32F103_PRODUCT_OLD , 'type': 'STM32' }]
for device in context.list_devices(subsystem='tty', ID_BUS='usb'):
for serialId in id:
#device['ID_VENDOR_ID'] (VID) and device['ID_MODEL_ID'] (PID) are in hex, but without the 0x prefix
if (int(device['ID_VENDOR_ID'], 16) == serialId['vendor']) and (int(device['ID_MODEL_ID'], 16) == serialId['product']):
#e.g. returnValue[/dev/ttyACM0] = [STM32, serial number]
returnValue[(device.device_node)] = [(serialId['type']), (device['ID_SERIAL'])]
rs232PortsList = returnValue
return rs232PortsList
def getFirstAvailableSerialPort():
returnValue = None
ports = getAvailableSerialPorts(False)
if ports != {}:
returnValue = (sorted(ports.keys())[0])
return returnValue
def hexString(reply):
if reply is None:
return ''
else:
return ' '.join(('%02x' % a) for a in reply)
def portType(port):
returnValue = None
ports = getAvailableSerialPorts(False)
if port in ports:
returnValue = ((ports[port])[0])
return returnValue
def u32ToArray(v):
return [(v & 255), ((v >> 8) & 255), ((v >> 16) & 255), ((v >> 24) & 255)]