mirror of
https://github.com/oliverschmidt/contiki.git
synced 2025-01-07 21:31:37 +00:00
1056 lines
34 KiB
Python
1056 lines
34 KiB
Python
|
|
# See comment in stm32w_flasher.py.
|
|
# Extraction and little adaptation performed by E.Duble (CNRS, LIG).
|
|
|
|
import ftdi
|
|
import os
|
|
import pyudev
|
|
import serial
|
|
import struct
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import ymodem
|
|
from messages import errorMessage
|
|
from file_utils import fileFormatReader
|
|
from messages import infoMessage
|
|
from messages import warningMessage
|
|
|
|
ACK = 121
|
|
BOOTLOADER_COMMAND_BAUDRATE = 50
|
|
CBUS_IOMODE = 10
|
|
FT232R_PRODUCT = 24577
|
|
FT232R_VENDOR = 1027
|
|
NAK = 31
|
|
STM32F103_PRODUCT = 22337
|
|
STM32F103_PRODUCT_OLD = 22336
|
|
STM32F103_VENDOR = 1155
|
|
StringTypes = None
|
|
TIMEOUT_TO_SWITCH_BETWEEN_APPLICATION_AND_BOOTLOADER = 10
|
|
commands = {'WRITE': [49, 3], 'GO': [33, 3], 'GET_ID': [2, 5], 'ERASE': [67, 2], 'GET': [0, 2], 'READ': [17, 259], 'WRITE_INCREMENTAL': [54, 2], 'GET_VERSION': [1, 5], 'READOUT_PROTECT': [130, 2], 'READOUT_UNPROTECT': [146, 2]}
|
|
pcIfHandle = None
|
|
rs232PortsList = None
|
|
|
|
class FTDI_Interface(object):
|
|
def __init__(self, port):
|
|
self.port = port
|
|
self.resetValue = None
|
|
self.nBootValue = None
|
|
|
|
def close(self):
|
|
return None
|
|
|
|
def getFTDIIdFromComPort(self, port):
|
|
returnValue = None
|
|
deviceIDs = None
|
|
mapping = self.mapComPortsToFTDIId()
|
|
if deviceIDs == None:
|
|
returnValue = (mapping[port])
|
|
else:
|
|
for id in deviceIDs:
|
|
if (mapping[port]) == id:
|
|
returnValue = id
|
|
break
|
|
else:
|
|
continue
|
|
if returnValue is None:
|
|
for id in deviceIDs:
|
|
if (mapping[port]).upper() == id.upper():
|
|
returnValue = id
|
|
break
|
|
else:
|
|
continue
|
|
return returnValue
|
|
|
|
def getFTDIIdFromDevPort(self, port):
|
|
device = pyudev.Device.from_device_file(pyudev.Context(), port)
|
|
return device['ID_SERIAL']
|
|
|
|
def guessResetDirection(self, h):
|
|
if (self.resetValue is None or self.nBootValue is None):
|
|
CBUS_2 = 4
|
|
CBUS_0 = 1
|
|
RESET_LINE = CBUS_2
|
|
GPIO5_LINE = CBUS_0
|
|
resetLow = ((RESET_LINE << 4) | 0)
|
|
resetHigh = ((RESET_LINE << 4) | RESET_LINE)
|
|
GPIO5Low = ((GPIO5_LINE << 4) | 0)
|
|
GPIO5High = ((GPIO5_LINE << 4) | GPIO5_LINE)
|
|
h.usb_write_timeout = 1000
|
|
h.usb_read_timeout = 1000
|
|
ret = ftdi.ftdi_set_baudrate(h, 115200)
|
|
ret = (ret + ftdi.ftdi_setflowctrl(h, ftdi.SIO_DISABLE_FLOW_CTRL))
|
|
ret = (ret + ftdi.ftdi_set_line_property(h, ftdi.BITS_8, ftdi.STOP_BIT_1, ftdi.NONE))
|
|
for i in range(2):
|
|
if (i % 2) == 0:
|
|
resetValue = resetHigh
|
|
nbootValue = GPIO5High
|
|
else:
|
|
resetValue = resetLow
|
|
nbootValue = GPIO5Low
|
|
ftdi.ftdi_set_bitmode(h, resetValue, ftdi.BITMODE_CBUS)
|
|
time.sleep(0.10000000000000001)
|
|
ftdi.ftdi_set_bitmode(h, nbootValue, ftdi.BITMODE_CBUS)
|
|
time.sleep(0.10000000000000001)
|
|
ftdi.ftdi_set_bitmode(h, 0, ftdi.BITMODE_CBUS)
|
|
ftdi.ftdi_usb_purge_rx_buffer(h)
|
|
ftdi.ftdi_usb_purge_tx_buffer(h)
|
|
ftdi.ftdi_write_data(h, struct.pack('B', 127), 1)
|
|
startTime = time.time()
|
|
inbuff = '\x00'
|
|
nbyte_rcvd = 0
|
|
while (nbyte_rcvd == 0 and (time.time() - startTime) < 1):
|
|
nbyte_rcvd = ftdi.ftdi_read_data(h, inbuff, 1)
|
|
continue
|
|
if nbyte_rcvd > 0:
|
|
reply = struct.unpack('B', inbuff)
|
|
if (reply[0]) == 121:
|
|
self.resetValue = resetValue
|
|
self.nBootValue = nbootValue
|
|
break
|
|
else:
|
|
continue
|
|
continue
|
|
return (self.resetValue, self.nBootValue)
|
|
|
|
def open(self):
|
|
return 0
|
|
|
|
def reset(self, bootloader=False):
|
|
handle = ftdi.ftdi_context()
|
|
if ftdi.ftdi_init(handle) < 0:
|
|
print 'Initialization error.',
|
|
print
|
|
sys.exit(-1)
|
|
serialnum = self.getFTDIIdFromDevPort(self.port)
|
|
if subprocess.call(['rmmod', 'ftdi_sio']) != 0:
|
|
print 'Close all processes that may access serial ports. You may also need to run the program as root.',
|
|
print
|
|
sys.exit(-1)
|
|
error = ftdi.ftdi_usb_open_desc(handle, FT232R_VENDOR, FT232R_PRODUCT, None, serialnum)
|
|
if error < 0:
|
|
print ('Unable to open port. Error:' + str(error)),
|
|
print
|
|
subprocess.call(['modprobe', 'ftdi_sio'])
|
|
sys.exit(-1)
|
|
item0, item1 = self.guessResetDirection(handle)
|
|
resetValue = item0
|
|
nBootValue = item1
|
|
if (resetValue, nBootValue) == (None, None):
|
|
ftdi.ftdi_usb_close(handle)
|
|
subprocess.call(['modprobe', 'ftdi_sio'])
|
|
return 1
|
|
else:
|
|
ftdi.ftdi_set_bitmode(handle, resetValue, ftdi.BITMODE_CBUS)
|
|
time.sleep(0.5)
|
|
if bootloader:
|
|
ftdi.ftdi_set_bitmode(handle, nBootValue, ftdi.BITMODE_CBUS)
|
|
time.sleep(0.10000000000000001)
|
|
ftdi.ftdi_set_bitmode(handle, 0, ftdi.BITMODE_CBUS)
|
|
ftdi.ftdi_usb_close(handle)
|
|
subprocess.call(['modprobe', 'ftdi_sio'])
|
|
time.sleep(1)
|
|
return None
|
|
|
|
def setNBOOT(self, value=1):
|
|
returnValue = True
|
|
return returnValue
|
|
|
|
|
|
class STBL(object):
|
|
def __init__(self, serialPort, inputFile=None, startAddress=None, verboseMode=None, packetSize=256, serialMode=True):
|
|
self.serialPort = serialPort
|
|
self.inputFile = inputFile
|
|
self.startAddress = startAddress
|
|
self.verbose = verboseMode
|
|
self.packetSize = packetSize
|
|
self.serialMode = serialMode
|
|
|
|
def bootloaderInit(self):
|
|
returnValue = True
|
|
if (returnValue and self.serialMode):
|
|
reply = None
|
|
if self.verbose:
|
|
infoMessage('Sending byte 7f\n')
|
|
self.serialPort.write(struct.pack('B', 127))
|
|
startTime = time.time()
|
|
while (self.serialPort.inWaiting() == 0 and (time.time() - startTime) < 1):
|
|
continue
|
|
r = self.serialPort.read(1)
|
|
if r:
|
|
reply = struct.unpack(('B' * len(r)), r)
|
|
else:
|
|
reply = None
|
|
if self.verbose:
|
|
infoMessage(("Reply received '%s'\n" % hexString(reply)))
|
|
if reply == None:
|
|
errorMessage('No reply received\n')
|
|
returnValue = False
|
|
else:
|
|
if (reply[0]) != ACK:
|
|
errorMessage(('Unexpected reply %s\n' % hexString(reply)))
|
|
returnValue = False
|
|
if returnValue:
|
|
reply = self.sendCommand('GET', [], 2)
|
|
if (len(reply) < 4 or ((reply[0]) != ACK or (reply[-1]) != ACK)):
|
|
errorMessage(('Unexpected reply %s\n' % hexString(reply)))
|
|
returnValue = False
|
|
else:
|
|
if self.verbose:
|
|
infoMessage(('Bootloader version %d.%d\n' % (((reply[2]) & 15), ((reply[2]) >> 4))))
|
|
if returnValue:
|
|
reply = self.sendCommand('GET_ID')
|
|
if (len(reply) == 0 or ((reply[0]) != ACK or (reply[-1]) != ACK)):
|
|
errorMessage(('Unexpected reply %s\n' % hexString(reply)))
|
|
returnValue = False
|
|
else:
|
|
if self.verbose:
|
|
infoMessage((('Product ID = 0x' + ''.join(('%02X' % a) for a in reply[2:-1])) + '\n'))
|
|
return returnValue
|
|
|
|
def checksum(self, packet):
|
|
if len(packet) > 1:
|
|
checksum = 0
|
|
for p in packet:
|
|
checksum = (checksum ^ p)
|
|
continue
|
|
else:
|
|
checksum = ((~(packet[0])) & 255)
|
|
return checksum
|
|
|
|
def enableReadProtection(self, enable):
|
|
returnValue = True
|
|
if enable == False:
|
|
reply = self.sendCommand('READOUT_UNPROTECT')
|
|
if reply != [ACK, ACK]:
|
|
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
|
|
returnValue = False
|
|
else:
|
|
item0, item1 = self.memoryRead(134481920, 512)
|
|
returnValue = item0
|
|
cibData = item1
|
|
if returnValue:
|
|
reply = self.sendCommand('ERASE', [[254]], timeout=5)
|
|
if reply != [ACK, ACK]:
|
|
errorMessage(('Unexpected reply %s\n' % repr(reply)))
|
|
returnValue = False
|
|
if returnValue:
|
|
arg1 = u32ToArray(134481922)
|
|
arg1.reverse()
|
|
packet = cibData[2:256]
|
|
packet = ([(len(packet) - 1)] + packet)
|
|
reply = self.sendCommand('WRITE', [arg1, packet], 5)
|
|
if reply != [ACK, ACK, ACK]:
|
|
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
|
|
returnValue = False
|
|
if returnValue:
|
|
arg1 = u32ToArray(134482176)
|
|
arg1.reverse()
|
|
packet = cibData[256:]
|
|
packet = ([(len(packet) - 1)] + packet)
|
|
reply = self.sendCommand('WRITE', [arg1, packet], 5)
|
|
if reply != [ACK, ACK, ACK]:
|
|
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
|
|
returnValue = False
|
|
if returnValue:
|
|
reply = self.sendCommand('READOUT_PROTECT')
|
|
if reply != [ACK, ACK]:
|
|
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
|
|
returnValue = False
|
|
return returnValue
|
|
|
|
def eraseUserFlash(self):
|
|
returnValue = True
|
|
reply = self.sendCommand('ERASE', [[255]], timeout=5)
|
|
if reply != [ACK, ACK]:
|
|
errorMessage(('Unexpected reply %s\n' % repr(reply)))
|
|
returnValue = False
|
|
return returnValue
|
|
|
|
def isReadProtectionActive(self):
|
|
returnValue, memread = self.memoryRead(134479872, 4)
|
|
return (not returnValue)
|
|
|
|
def loadFile(self, inputFile=None, startAddress=None, progressReport=None, doErase=True):
|
|
if inputFile is not None:
|
|
self.inputFile = inputFile
|
|
if startAddress is not None:
|
|
self.startAddress = startAddress
|
|
returnValue = True
|
|
f = fileFormatReader(self.inputFile, self.startAddress)
|
|
try:
|
|
item0, item1 = f.getRawBinary()
|
|
self.startAddress = item0
|
|
file_content = item1
|
|
except IOError:
|
|
errorMessage((('File ' + self.inputFile) + ' open failed\n'))
|
|
returnValue = False
|
|
if returnValue:
|
|
file_size = len(file_content)
|
|
packet = []
|
|
address = self.startAddress
|
|
pages = int(((file_size + 1023) / 1024))
|
|
startPage = (((self.startAddress & 4294966272L) - 134217728) / 1024)
|
|
eraseArg = ([(pages - 1)] + range(startPage, (startPage + pages), 1))
|
|
infoMessage(('Erasing pages from %d to %d...' % (startPage, ((startPage + pages) - 1))))
|
|
if ('STM32W_FLASHER_JLINK_DONT_ERASE' in os.environ or (not doErase)):
|
|
infoMessage('(skipped)', False)
|
|
else:
|
|
reply = self.sendCommand('ERASE', [eraseArg], timeout=10)
|
|
if reply != [ACK, ACK]:
|
|
errorMessage(('Unexpected reply %s\n' % repr(reply)))
|
|
returnValue = False
|
|
if returnValue:
|
|
infoMessage('done\n', False)
|
|
size = 0
|
|
while returnValue:
|
|
packet = []
|
|
packet_size = self.packetSize
|
|
packet_string = file_content[size:(size + packet_size)]
|
|
packet_size = len(packet_string)
|
|
if packet_size == 0:
|
|
infoMessage('\n')
|
|
break
|
|
else:
|
|
size = (size + packet_size)
|
|
packet.extend(packet_string)
|
|
if (len(packet) % 2) != 0:
|
|
packet = (packet + [255])
|
|
packet = ([(len(packet) - 1)] + packet)
|
|
if progressReport:
|
|
progressReport(size, file_size)
|
|
else:
|
|
infoMessage(('Programming %05d/%05d\r' % (size, file_size)))
|
|
arg1 = u32ToArray(address)
|
|
arg1.reverse()
|
|
if (self.serialMode or size == packet_size):
|
|
reply = self.sendCommand('WRITE', [arg1, packet])
|
|
if reply != [ACK, ACK, ACK]:
|
|
errorMessage(('\n\n+Unexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
|
|
returnValue = False
|
|
else:
|
|
retries = 0
|
|
MAX_RETRIES = 3
|
|
while returnValue:
|
|
reply = self.sendCommand('WRITE_INCREMENTAL', [packet])
|
|
if reply != [ACK, ACK]:
|
|
retries = (retries + 1)
|
|
if retries > MAX_RETRIES:
|
|
errorMessage(('\n\nUnexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
|
|
returnValue = False
|
|
break
|
|
else:
|
|
errorMessage(('\n\nUnexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
|
|
infoMessage('Retrying...\n')
|
|
continue
|
|
else:
|
|
break
|
|
address = (address + packet_size)
|
|
continue
|
|
if returnValue:
|
|
if progressReport == None:
|
|
infoMessage('Done\n')
|
|
return returnValue
|
|
|
|
def memoryRead(self, address, size):
|
|
returnValue = True
|
|
currentSize = 0
|
|
memRead = []
|
|
while (returnValue and currentSize < size):
|
|
|
|
packet_size = min(self.packetSize, (size - currentSize))
|
|
arg1 = u32ToArray((address + currentSize))
|
|
arg1.reverse()
|
|
reply = self.sendCommand('READ', [arg1, [(packet_size - 1)]])
|
|
if reply == [NAK]:
|
|
returnValue = False
|
|
else:
|
|
if reply[:3] != [ACK, ACK, ACK]:
|
|
errorMessage(('\n\nXXUnexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
|
|
returnValue = False
|
|
else:
|
|
memRead = (memRead + reply[3:])
|
|
currentSize = (currentSize + packet_size)
|
|
continue
|
|
return (returnValue, memRead)
|
|
|
|
def programCibData(self, cibData):
|
|
returnValue = True
|
|
if returnValue:
|
|
reply = self.sendCommand('ERASE', [[254]], timeout=5)
|
|
if reply != [ACK, ACK]:
|
|
errorMessage(('Unexpected reply %s\n' % repr(reply)))
|
|
returnValue = False
|
|
if returnValue:
|
|
arg1 = u32ToArray(134481920)
|
|
arg1.reverse()
|
|
packet = cibData[:256]
|
|
packet = ([(len(packet) - 1)] + packet)
|
|
reply = self.sendCommand('WRITE', [arg1, packet], 5)
|
|
if reply != [ACK, ACK, ACK]:
|
|
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
|
|
returnValue = False
|
|
if returnValue:
|
|
arg1 = u32ToArray(134482176)
|
|
arg1.reverse()
|
|
packet = cibData[256:]
|
|
packet = ([(len(packet) - 1)] + packet)
|
|
reply = self.sendCommand('WRITE', [arg1, packet], 5)
|
|
if reply != [ACK, ACK, ACK]:
|
|
errorMessage((('Unexpected reply (' + ','.join(('%02x' % a) for a in reply)) + ') \n'))
|
|
returnValue = False
|
|
return returnValue
|
|
|
|
def programUserFlash(self, inputFile, startAddress=134217728, progressReport=None, doErase=True):
|
|
return self.loadFile(inputFile, startAddress, progressReport, doErase)
|
|
|
|
def sendCommand(self, command, args=[], timeout=2, traceCommands=False):
|
|
def timedRead(port, timeout, serialMode, bytes=1):
|
|
reply = []
|
|
r = ''
|
|
for byte in range(bytes):
|
|
startTime = time.time()
|
|
r1 = ''
|
|
while (time.time() - startTime) < timeout:
|
|
r1 = port.read(1)
|
|
if len(r1) > 0:
|
|
r = (r + r1)
|
|
startTime = time.time()
|
|
break
|
|
else:
|
|
if serialMode:
|
|
continue
|
|
else:
|
|
time.sleep(0.001)
|
|
continue
|
|
if len(r1) == 0:
|
|
break
|
|
else:
|
|
continue
|
|
if len(r) > 0:
|
|
reply.extend(struct.unpack(('B' * len(r)), r))
|
|
return reply
|
|
reply = []
|
|
error = False
|
|
if command in commands:
|
|
item0, item1 = (commands[command])
|
|
commandID = item0
|
|
replyLength = item1
|
|
if command == 'READ':
|
|
replyLength = (((args[1])[0]) + 4)
|
|
else:
|
|
error = True
|
|
if error:
|
|
pass
|
|
else:
|
|
if traceCommands:
|
|
infoMessage(('Sending command: %02x %02x\n' % (commandID, (255 - commandID))))
|
|
self.serialPort.write(struct.pack('BB', commandID, (255 - commandID)))
|
|
r = timedRead(self.serialPort, timeout, self.serialMode, 1)
|
|
reply = (reply + r)
|
|
if (len(r) != 1 or (r[0]) != ACK):
|
|
error = True
|
|
if error:
|
|
pass
|
|
else:
|
|
for arg in args:
|
|
arg = (arg + [self.checksum(arg)])
|
|
if traceCommands:
|
|
infoMessage((('Sending arg:' + ','.join(('%02x' % a) for a in arg)) + '\n'))
|
|
self.serialPort.write(struct.pack(('B' * len(arg)), *arg))
|
|
r = timedRead(self.serialPort, timeout, self.serialMode, 1)
|
|
reply = (reply + r)
|
|
if (len(r) != 1 or (r[0]) != ACK):
|
|
error = True
|
|
break
|
|
else:
|
|
continue
|
|
if ((not error) and len(reply) < replyLength):
|
|
reply = (reply + timedRead(self.serialPort, timeout, self.serialMode, (replyLength - len(reply))))
|
|
if (command == 'GET' and len(reply) == replyLength):
|
|
reply = (reply + timedRead(self.serialPort, timeout, self.serialMode, ((reply[1]) + 2)))
|
|
if traceCommands:
|
|
infoMessage(('Reply was %s\n' % hexString(reply)))
|
|
return reply
|
|
|
|
def setPort(self, serialPort):
|
|
self.serialPort = serialPort
|
|
|
|
def startApplication(self, startAddress=134217728):
|
|
returnValue = True
|
|
if (self.startAddress is not None and startAddress != 0):
|
|
startAddress = self.startAddress
|
|
arg1 = u32ToArray(startAddress)
|
|
arg1.reverse()
|
|
reply = self.sendCommand('GO', [arg1])
|
|
if reply != [ACK, ACK, ACK]:
|
|
errorMessage(('\n\nUnexpected reply %s\n' % repr(reply)))
|
|
returnValue = False
|
|
return returnValue
|
|
|
|
def verifyFile(self, inputFile=None, startAddress=None, progressReport=None):
|
|
if inputFile is not None:
|
|
self.inputFile = inputFile
|
|
if startAddress is not None:
|
|
self.startAddress = startAddress
|
|
returnValue = True
|
|
f = fileFormatReader(self.inputFile, self.startAddress)
|
|
try:
|
|
item0, item1 = f.getRawBinary()
|
|
self.startAddress = item0
|
|
file_content = item1
|
|
except IOError:
|
|
errorMessage((('File ' + self.inputFile) + ' open failed\n'))
|
|
returnValue = False
|
|
if returnValue:
|
|
file_size = len(file_content)
|
|
packet = []
|
|
address = self.startAddress
|
|
size = 0
|
|
errors = 0
|
|
while returnValue:
|
|
packet = []
|
|
packet_size = self.packetSize
|
|
packet_string = file_content[size:(size + packet_size)]
|
|
packet_size = len(packet_string)
|
|
if packet_size == 0:
|
|
infoMessage('\n')
|
|
break
|
|
else:
|
|
size = (size + packet_size)
|
|
packet.extend(packet_string)
|
|
if progressReport:
|
|
progressReport(size, file_size)
|
|
else:
|
|
infoMessage(('Verifying %05d/%05d\r' % (size, file_size)))
|
|
arg1 = u32ToArray(address)
|
|
arg1.reverse()
|
|
reply = self.sendCommand('READ', [arg1, [(len(packet) - 1)]])
|
|
if reply[:3] != [ACK, ACK, ACK]:
|
|
errorMessage(('\n\nUnexpected reply %s, packet_size=%d\n' % (repr(reply), packet_size)))
|
|
returnValue = False
|
|
if len(reply[3:]) != len(packet):
|
|
errorMessage(('Invalid data read, expected length = %d, received bytes = %d\n' % (len(packet), len(reply[3:]))))
|
|
returnValue = False
|
|
if (returnValue and reply[3:] != packet):
|
|
returnValue = False
|
|
infoMessage('Verify failed \n')
|
|
infoMessage(('%-8s: %5s %5s\n' % ('Addr', 'Flash', 'file')))
|
|
for i in range(len(packet)):
|
|
if (reply[(3 + i)]) != (packet[i]):
|
|
infoMessage(('%08x: %02x %02x\n' % ((address + i), (reply[(i + 3)]), (packet[i]))))
|
|
errors = (errors + 1)
|
|
if errors > 64:
|
|
break
|
|
else:
|
|
continue
|
|
else:
|
|
continue
|
|
address = (address + packet_size)
|
|
continue
|
|
if returnValue:
|
|
if progressReport == None:
|
|
infoMessage('Done\n')
|
|
return returnValue
|
|
|
|
def verifyFlash(self, inputFile, startAddress=134217728, progressReport=None):
|
|
return self.verifyFile(inputFile, startAddress, progressReport)
|
|
|
|
|
|
class STM32F_Interface(object):
|
|
APP_RUNNING = 1
|
|
BL_RUNNING = 0
|
|
DOWNLOAD_BL_IMAGE = 10
|
|
DOWNLOAD_IMAGE = 5
|
|
FAIL = 0
|
|
FIRMWARE_VERSION_NONE = 4294967295L
|
|
GET_APP_VERSION = 3
|
|
GET_BL_VERSION = 9
|
|
GET_CODE_TYPE = 2
|
|
IS_APP_PRESENT = 4
|
|
IS_BL_VERSION_OLD = 11
|
|
OK = 1
|
|
REPLY_START_BYTE = 187
|
|
RUN_APPLICATION = 6
|
|
RUN_BOOTLOADER = 7
|
|
SET_nBOOTMODE = 1
|
|
SET_nRESET = 0
|
|
START_BYTE = 170
|
|
STOP_BYTE = 85
|
|
UNKNOWN_COMMAND = 15
|
|
UPLOAD_IMAGE = 8
|
|
replyLength = [4, 4, 4, 7, 4, 4, 4, 4, 4, 7, 4, 4]
|
|
def __init__(self, port, firmwareName):
|
|
self.port = port
|
|
self.bootloaderPort = port
|
|
self.firmwareName = firmwareName
|
|
self.serialPort = None
|
|
|
|
def close(self):
|
|
if self.serialPort:
|
|
self.serialPort.close()
|
|
|
|
def getBootloaderFirmwareVersion(self):
|
|
returnValue = self.FIRMWARE_VERSION_NONE
|
|
reply = self.sendCommand(self.GET_BL_VERSION)
|
|
if reply:
|
|
returnValue = ((((reply[2]) + ((reply[3]) << 8)) + ((reply[4]) << 16)) + ((reply[5]) << 24))
|
|
return returnValue
|
|
|
|
def getFirmwareType(self):
|
|
returnValue = None
|
|
reply = self.sendCommand(self.GET_CODE_TYPE)
|
|
if reply:
|
|
returnValue = (reply[2])
|
|
return returnValue
|
|
|
|
def getFirmwareVersion(self):
|
|
returnValue = self.FIRMWARE_VERSION_NONE
|
|
reply = self.sendCommand(self.GET_APP_VERSION)
|
|
if reply:
|
|
returnValue = ((((reply[2]) + ((reply[3]) << 8)) + ((reply[4]) << 16)) + ((reply[5]) << 24))
|
|
return returnValue
|
|
|
|
def getFirmwareVersionFromFile(self, filename=None, stringMode=False):
|
|
version = None
|
|
if filename is None:
|
|
filename = self.firmwareName
|
|
f = open(filename)
|
|
f.seek(28, 0)
|
|
tag = f.read(4)
|
|
versionValue = f.read(4)
|
|
versionValue = struct.unpack(('B' * len(versionValue)), versionValue)
|
|
f.close()
|
|
if tag == '\xaaU\xaaU':
|
|
version = ((((versionValue[0]) + ((versionValue[1]) << 8)) + ((versionValue[2]) << 16)) + ((versionValue[3]) << 24))
|
|
if stringMode:
|
|
version = self.versionInStringFormat(version)
|
|
return version
|
|
|
|
def isBootloaderFirmwareVersionOld(self):
|
|
returnValue = False
|
|
reply = self.sendCommand(self.IS_BL_VERSION_OLD)
|
|
if reply:
|
|
returnValue = (reply[2]) == self.FAIL
|
|
return returnValue
|
|
|
|
def isFirmwarePresent(self):
|
|
returnValue = None
|
|
reply = self.sendCommand(self.IS_APP_PRESENT)
|
|
if reply:
|
|
returnValue = (reply[2])
|
|
return returnValue
|
|
|
|
def isSTMicroelectronics(self, port):
|
|
return port in self.getSTMPorts()
|
|
|
|
def mapSTMCompositeComPortToBootloaderCOMPort(self, port):
|
|
warningMessage('The following procedure may fail if there are other attached USB devices.\nIn this case, try to unplug all the devices and insert only the one to be programmed, then run the command again.\n')
|
|
return port
|
|
|
|
def open(self):
|
|
error = 0
|
|
try:
|
|
self.serialPort = serial.Serial(port=self.port, baudrate=BOOTLOADER_COMMAND_BAUDRATE, timeout=1)
|
|
time.sleep(0.10000000000000001)
|
|
self.serialPort.flushInput()
|
|
except:
|
|
errorMessage((('Trouble opening port : ' + repr(self.port)) + '\n'))
|
|
error = 1
|
|
return error
|
|
|
|
def reset(self, bootloader=False):
|
|
returnValue = True
|
|
if returnValue:
|
|
reply = self.sendCommand(self.SET_nBOOTMODE, [1])
|
|
returnValue = reply != None
|
|
if returnValue:
|
|
reply = self.sendCommand(self.SET_nRESET, [0])
|
|
returnValue = reply != None
|
|
if returnValue:
|
|
if bootloader:
|
|
reply = self.sendCommand(self.SET_nBOOTMODE, [0])
|
|
returnValue = reply != None
|
|
time.sleep(0.10000000000000001)
|
|
if returnValue:
|
|
reply = self.sendCommand(self.SET_nRESET, [1])
|
|
returnValue = reply != None
|
|
time.sleep(0.10000000000000001)
|
|
time.sleep(0.10000000000000001)
|
|
return returnValue
|
|
|
|
def runBootloader(self):
|
|
returnValue = None
|
|
reply = self.sendCommand(self.RUN_BOOTLOADER, [], True)
|
|
if reply:
|
|
commandReturnValue = (reply[2])
|
|
time.sleep(TIMEOUT_TO_SWITCH_BETWEEN_APPLICATION_AND_BOOTLOADER)
|
|
for i in range(5):
|
|
try:
|
|
self.serialPort = serial.Serial(port=self.bootloaderPort, baudrate=BOOTLOADER_COMMAND_BAUDRATE, timeout=1)
|
|
self.serialPort = checkSerialPort(self.bootloaderPort, self.serialPort)
|
|
time.sleep(0.5)
|
|
self.serialPort.flushInput()
|
|
if self.serialPort is None:
|
|
returnValue = None
|
|
else:
|
|
returnValue = commandReturnValue
|
|
break
|
|
except Exception, inst:
|
|
errorMessage((' '.join(repr(a) for a in inst.args) + '\n'))
|
|
time.sleep(0.5)
|
|
continue
|
|
return returnValue
|
|
|
|
def runFirmware(self):
|
|
returnValue = None
|
|
reply = self.sendCommand(self.RUN_APPLICATION, [], True)
|
|
if reply:
|
|
commandReturnValue = (reply[2])
|
|
time.sleep(TIMEOUT_TO_SWITCH_BETWEEN_APPLICATION_AND_BOOTLOADER)
|
|
for i in range(5):
|
|
try:
|
|
self.serialPort = serial.Serial(port=self.port, baudrate=BOOTLOADER_COMMAND_BAUDRATE, timeout=1)
|
|
self.serialPort = checkSerialPort(self.port, self.serialPort)
|
|
if self.serialPort is None:
|
|
returnValue = None
|
|
else:
|
|
returnValue = commandReturnValue
|
|
break
|
|
except Exception, inst:
|
|
errorMessage((' '.join(repr(a) for a in inst.args) + '\n'))
|
|
time.sleep(0.5)
|
|
continue
|
|
return returnValue
|
|
|
|
def sendCommand(self, command, args=[], closePort=False):
|
|
verbose = False
|
|
returnValue = None
|
|
if command == self.DOWNLOAD_IMAGE:
|
|
filename = (args[0])
|
|
args = []
|
|
replyLength = self.replyLength
|
|
packet = (([self.START_BYTE, (1 + len(args)), command] + args) + [self.STOP_BYTE])
|
|
if verbose:
|
|
print '==>',
|
|
print ':'.join(('%02X' % a) for a in packet),
|
|
print
|
|
packedCommand = struct.pack(('B' * len(packet)), *packet)
|
|
self.serialPort.write(packedCommand)
|
|
if command == self.DOWNLOAD_IMAGE:
|
|
yModem = ymodem.Ymodem(self.serialPort, None)
|
|
if yModem.loadFile(filename):
|
|
returnValue = None
|
|
r = self.serialPort.read((replyLength[command]))
|
|
if closePort:
|
|
self.serialPort.close()
|
|
if len(r) == (replyLength[command]):
|
|
reply = struct.unpack(('B' * len(r)), r)
|
|
if ((reply[0]) == self.REPLY_START_BYTE and ((reply[1]) == ((replyLength[command]) - 3) and (reply[-1]) == self.STOP_BYTE)):
|
|
returnValue = reply
|
|
if verbose:
|
|
if r:
|
|
reply = struct.unpack(('B' * len(r)), r)
|
|
print '<==',
|
|
print ':'.join(('%02X' % a) for a in reply),
|
|
print
|
|
else:
|
|
print '<==',
|
|
print
|
|
return returnValue
|
|
|
|
def setNBOOT(self, value=1):
|
|
returnValue = True
|
|
if returnValue:
|
|
reply = self.sendCommand(self.SET_nBOOTMODE, [value])
|
|
returnValue = reply != None
|
|
return returnValue
|
|
|
|
def upgradeBootloaderFirmwareVersion(self):
|
|
returnValue = True
|
|
reply = self.sendCommand(self.DOWNLOAD_BL_IMAGE)
|
|
if reply:
|
|
returnValue = (reply[2]) == self.OK
|
|
else:
|
|
returnValue = False
|
|
return returnValue
|
|
|
|
def upgradeFirmwareVersion(self, filename=None):
|
|
returnValue = True
|
|
if filename is None:
|
|
filename = self.firmwareName
|
|
if returnValue:
|
|
firmwareType = self.getFirmwareType()
|
|
if firmwareType == self.APP_RUNNING:
|
|
reply = self.runBootloader()
|
|
if reply is None:
|
|
returnValue = False
|
|
else:
|
|
if firmwareType == self.BL_RUNNING:
|
|
pass
|
|
else:
|
|
returnValue = False
|
|
if returnValue:
|
|
if self.getFirmwareType() != self.BL_RUNNING:
|
|
errorMessage('Unable to read from port, please use a USB HUB 1.1 and rerun the command\n')
|
|
returnValue = False
|
|
else:
|
|
reply = self.sendCommand(self.DOWNLOAD_IMAGE, [filename])
|
|
returnValue = (reply[2]) == self.OK
|
|
return returnValue
|
|
|
|
def versionInStringFormat(self, version):
|
|
return ('%d.%d.%d' % (((version >> 16) & 255), ((version >> 8) & 255), (version & 255)))
|
|
|
|
|
|
class rs232Interface(object):
|
|
def __init__(self, port, noReset=False, rfMode=False, eui64=0):
|
|
self.port = port
|
|
self.noReset = noReset
|
|
self.rfMode = rfMode
|
|
self.eui64 = eui64
|
|
self.serialPort = None
|
|
self.portType = portType(port)
|
|
if self.portType == 'FTDI':
|
|
self.portHandle = FTDI_Interface(port)
|
|
else:
|
|
if self.portType == 'STM32':
|
|
firmwareName = 'CompositeForSTM32W.bin'
|
|
if 'STM32W_FLASHER_FORCE_FIRMWARE_NAME' in os.environ:
|
|
firmwareName = (os.environ['STM32W_FLASHER_FORCE_FIRMWARE_NAME'])
|
|
flasher_py_files = os.path.dirname(os.path.abspath(sys.argv[0]))
|
|
firmware_dir = os.path.dirname(flasher_py_files) # parent directory
|
|
self.portHandle = STM32F_Interface(port, os.path.join(firmware_dir, firmwareName))
|
|
|
|
def closePort(self):
|
|
self.serialPort.close()
|
|
|
|
def enableReadProtection(self, flag):
|
|
return self.STBL.enableReadProtection(flag)
|
|
|
|
def eraseUserFlash(self):
|
|
return self.STBL.eraseUserFlash()
|
|
|
|
def init(self, checkFirmwareImage=True):
|
|
error = 0
|
|
if self.noReset:
|
|
if self.rfMode:
|
|
error = self.reset(False)
|
|
time.sleep(0.10000000000000001)
|
|
else:
|
|
error = 0
|
|
else:
|
|
error = self.reset(True, checkFirmwareImage)
|
|
if error == 1:
|
|
errorMessage((('Trouble while resetting board on port : ' + repr(self.port)) + '\n'))
|
|
if error == 0:
|
|
error = self.openPort()
|
|
if error == 0:
|
|
if self.rfMode:
|
|
self.packetSize = 96
|
|
def sendBinary(port, data, length):
|
|
for i in range(length):
|
|
port.write(chr(((data >> (i * 8)) & 255)))
|
|
continue
|
|
sendBinary(self.serialPort, self.eui64, 8)
|
|
sendBinary(self.serialPort, 45067, 2)
|
|
sendBinary(self.serialPort, 15, 1)
|
|
time.sleep(0.5)
|
|
while self.serialPort.inWaiting() > 0:
|
|
sys.stdout.write(self.serialPort.read(self.serialPort.inWaiting()))
|
|
continue
|
|
else:
|
|
self.packetSize = 256
|
|
self.STBL = STBL(self.serialPort, packetSize=self.packetSize, serialMode=(not self.rfMode))
|
|
if self.STBL is not None:
|
|
tmp = self.STBL.bootloaderInit()
|
|
if tmp:
|
|
pass
|
|
else:
|
|
error = 1
|
|
return error
|
|
|
|
def isReadProtectionActive(self):
|
|
return self.STBL.isReadProtectionActive()
|
|
|
|
def memoryRead(self, address, size):
|
|
return self.STBL.memoryRead(address, size)
|
|
|
|
def openPort(self):
|
|
error = 0
|
|
try:
|
|
self.serialPort = serial.Serial(port=self.port, bytesize=8, baudrate=115200, parity='N', timeout=0)
|
|
except:
|
|
errorMessage((('Trouble opening port : ' + repr(self.port)) + '\n'))
|
|
error = 1
|
|
return error
|
|
|
|
def programCibData(self, cibData):
|
|
return self.STBL.programCibData(cibData)
|
|
|
|
def programUserFlash(self, inputFile, startAddress=134217728, progressReport=None, doErase=True):
|
|
return self.STBL.programUserFlash(inputFile, startAddress, progressReport, doErase)
|
|
|
|
def reset(self, bootloader=False, checkSTM32FFirmware=True):
|
|
returnValue = 0
|
|
if self.portType == 'FTDI':
|
|
self.portHandle.reset(bootloader)
|
|
else:
|
|
if self.portType == 'STM32':
|
|
returnValue = self.portHandle.open()
|
|
if checkSTM32FFirmware:
|
|
if returnValue == 0:
|
|
firmwareVersionCurrent = self.portHandle.getFirmwareVersionFromFile(stringMode=False)
|
|
if returnValue == 0:
|
|
firmwareType = self.portHandle.getFirmwareType()
|
|
if (firmwareType is None or (firmwareType != self.portHandle.BL_RUNNING and firmwareType != self.portHandle.APP_RUNNING)):
|
|
errorMessage('Failed to get firmware type: Invalid reply\n')
|
|
returnValue = 1
|
|
if returnValue == 0:
|
|
firmwareVersion = self.portHandle.getFirmwareVersion()
|
|
if firmwareVersion is None:
|
|
errorMessage('Failed to get firmware version:Invalid reply\n')
|
|
returnValue = 1
|
|
else:
|
|
if (firmwareVersion & 4278190080L):
|
|
firmwareVersion = self.portHandle.FIRMWARE_VERSION_NONE
|
|
if returnValue == 0:
|
|
bootloaderFirmwareVersion = self.portHandle.getBootloaderFirmwareVersion()
|
|
if bootloaderFirmwareVersion is None:
|
|
errorMessage('Failed to get bootloader firmware version:Invalid reply\n')
|
|
returnValue = 1
|
|
if bootloaderFirmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE:
|
|
self.portHandle.bootloaderPort = self.portHandle.mapSTMCompositeComPortToBootloaderCOMPort(self.portHandle.port)
|
|
if returnValue == 0:
|
|
if (firmwareType == self.portHandle.BL_RUNNING and (firmwareVersion != self.portHandle.FIRMWARE_VERSION_NONE and firmwareVersion >= firmwareVersionCurrent)):
|
|
if bootloaderFirmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE:
|
|
errorMessage('Please unplug and replug the board to the PC\n')
|
|
returnValue = 2
|
|
else:
|
|
result = self.portHandle.runFirmware()
|
|
if result is not self.portHandle.OK:
|
|
errorMessage('Failed to run firmware: Invalid reply\n')
|
|
returnValue = 1
|
|
if returnValue == 0:
|
|
if (firmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE or firmwareVersion < firmwareVersionCurrent):
|
|
if firmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE:
|
|
infoMessage(('Missing STM32F103 firmware, upgrading to version %s\n' % self.portHandle.getFirmwareVersionFromFile(stringMode=True)))
|
|
else:
|
|
infoMessage(('Old STM32F103 firmware version %s, upgrading to version %s\n' % (self.portHandle.versionInStringFormat(firmwareVersion), self.portHandle.getFirmwareVersionFromFile(stringMode=True))))
|
|
warningMessage('Changing firmware version can break backward compatibility and older version of stm32w_flasher may not work\n')
|
|
if self.portHandle.upgradeFirmwareVersion():
|
|
if ((firmwareVersion == self.portHandle.FIRMWARE_VERSION_NONE or firmwareVersion < 131072) and firmwareVersionCurrent >= 131072):
|
|
errorMessage(('Switching to firmware %s may change your COM port number, please unplug and replug your USB device and re-run this command again\n' % self.portHandle.getFirmwareVersionFromFile(stringMode=True)))
|
|
returnValue = 2
|
|
else:
|
|
result = self.portHandle.runFirmware()
|
|
if result is not self.portHandle.OK:
|
|
errorMessage('Failed to run firmware: Invalid reply\n')
|
|
returnValue = 1
|
|
else:
|
|
errorMessage('Failed to upgrade firmware version: Invalid reply\n')
|
|
returnValue = 1
|
|
if returnValue == 0:
|
|
firmwareVersion = self.portHandle.getFirmwareVersion()
|
|
if firmwareVersion is None:
|
|
errorMessage('Invalid reply\n')
|
|
returnValue = 1
|
|
if returnValue == 0:
|
|
if firmwareVersion >= 131076:
|
|
if self.portHandle.isBootloaderFirmwareVersionOld():
|
|
infoMessage('Upgrading STM32F Bootloader firmware\n')
|
|
if self.portHandle.upgradeBootloaderFirmwareVersion():
|
|
pass
|
|
else:
|
|
errorMessage('Upgrade of the STM32F bootloader failed. This is a CRITICAL error and your board may brick your board\n')
|
|
returnValue = 1
|
|
if returnValue == 0:
|
|
if firmwareVersion > firmwareVersionCurrent:
|
|
warningMessage('Device firmware is more recent than expected. Are you using an old version of this software ?\n')
|
|
if returnValue == 0:
|
|
result = self.portHandle.reset(bootloader)
|
|
if result is None:
|
|
errorMessage('Failed to reset STM32W: Invalid reply\n')
|
|
returnValue = 1
|
|
self.portHandle.close()
|
|
else:
|
|
errorMessage(('Failed to detect port type for %s\n' % self.port))
|
|
returnValue = 1
|
|
return returnValue
|
|
|
|
def setNBOOT(self, value):
|
|
returnValue = 0
|
|
if (self.portHandle and self.portType == 'STM32'):
|
|
self.portHandle.open()
|
|
returnValue = self.portHandle.setNBOOT(value)
|
|
self.portHandle.close()
|
|
return returnValue
|
|
|
|
def startApplication(self, address=134217728):
|
|
if self.noReset is False:
|
|
self.closePort()
|
|
self.setNBOOT(1)
|
|
self.openPort()
|
|
self.STBL.setPort(self.serialPort)
|
|
return self.STBL.startApplication(address)
|
|
|
|
def terminate(self):
|
|
if self.noReset is False:
|
|
self.closePort()
|
|
self.setNBOOT(1)
|
|
self.openPort()
|
|
|
|
def verifyFlash(self, inputFile, startAddress, progressReport):
|
|
return self.STBL.verifyFlash(inputFile, startAddress, progressReport)
|
|
|
|
|
|
|
|
def checkSerialPort(portName, serialPort):
|
|
returnValue = serialPort
|
|
try:
|
|
serialPort.write(' ')
|
|
except serial.serialutil.SerialException, inst:
|
|
errorMessage((' '.join(repr(a) for a in inst.args) + '\n'))
|
|
infoMessage('Write to serial port failed: please try a different USB port or a USB hub 1.1\n')
|
|
returnValue = None
|
|
return returnValue
|
|
|
|
def getAvailableSerialPorts(refreshList=True):
|
|
global rs232PortsList
|
|
if (refreshList or rs232PortsList is None):
|
|
returnValue = {}
|
|
context = pyudev.Context()
|
|
id = [{ 'vendor': FT232R_VENDOR , 'product': FT232R_PRODUCT , 'type': 'FTDI' }, { 'vendor': STM32F103_VENDOR , 'product': STM32F103_PRODUCT , 'type': 'STM32' }, { 'vendor': STM32F103_VENDOR , 'product': STM32F103_PRODUCT_OLD , 'type': 'STM32' }]
|
|
for device in context.list_devices(subsystem='tty', ID_BUS='usb'):
|
|
for serialId in id:
|
|
#device['ID_VENDOR_ID'] (VID) and device['ID_MODEL_ID'] (PID) are in hex, but without the 0x prefix
|
|
if (int(device['ID_VENDOR_ID'], 16) == serialId['vendor']) and (int(device['ID_MODEL_ID'], 16) == serialId['product']):
|
|
#e.g. returnValue[/dev/ttyACM0] = [STM32, serial number]
|
|
returnValue[(device.device_node)] = [(serialId['type']), (device['ID_SERIAL'])]
|
|
rs232PortsList = returnValue
|
|
return rs232PortsList
|
|
|
|
def getFirstAvailableSerialPort():
|
|
returnValue = None
|
|
ports = getAvailableSerialPorts(False)
|
|
if ports != {}:
|
|
returnValue = (sorted(ports.keys())[0])
|
|
return returnValue
|
|
|
|
def hexString(reply):
|
|
if reply is None:
|
|
return ''
|
|
else:
|
|
return ' '.join(('%02x' % a) for a in reply)
|
|
|
|
def portType(port):
|
|
returnValue = None
|
|
ports = getAvailableSerialPorts(False)
|
|
if port in ports:
|
|
returnValue = ((ports[port])[0])
|
|
return returnValue
|
|
|
|
def u32ToArray(v):
|
|
return [(v & 255), ((v >> 8) & 255), ((v >> 16) & 255), ((v >> 24) & 255)]
|
|
|