Refactoring of OLED screen Python script (#413)

* Split up source into modules

* Add symlink to .pylintrc in the racsci-web dir

* Resolve pylint warnings

* Resolve pylint warnings

* Split out methods into separate modules

* Resolve pylint warnings

* Clean up docstrings
This commit is contained in:
Daniel Markstedt 2021-11-02 18:20:42 -07:00 committed by GitHub
parent 0f52e72915
commit 4219e6918c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 302 additions and 254 deletions

1
src/oled_monitor/.pylintrc Symbolic link
View File

@ -0,0 +1 @@
../.pylintrc

View File

@ -0,0 +1,38 @@
"""
Linux interrupt handling module
"""
import signal
class GracefulInterruptHandler():
"""
Class for handling Linux signal interrupts
"""
def __init__(self, signals=(signal.SIGINT, signal.SIGTERM)):
self.signals = signals
self.original_handlers = {}
self.interrupted = False
self.released = False
def __enter__(self):
for sig in self.signals:
self.original_handlers[sig] = signal.getsignal(sig)
signal.signal(sig, self.handler)
return self
def handler(self, signum, frame):
self.release()
self.interrupted = True
def __exit__(self, exception_type, exception_value, traceback):
self.release()
def release(self):
if self.released:
return False
for sig in self.signals:
signal.signal(sig, self.original_handlers[sig])
self.released = True
return True

View File

@ -0,0 +1,20 @@
"""
Module with methods that interact with the Pi's Linux system
"""
def get_ip_and_host():
"""
Use a mock socket connection to identify the Pi's hostname and IP address
"""
from socket import socket, gethostname, AF_INET, SOCK_DGRAM
host = gethostname()
sock = socket(AF_INET, SOCK_DGRAM)
try:
# mock ip address; doesn't have to be reachable
sock.connect(('10.255.255.255', 1))
ip_addr = sock.getsockname()[0]
except Exception:
ip_addr = '127.0.0.1'
finally:
sock.close()
return ip_addr, host

View File

@ -0,0 +1,51 @@
"""
Module for commands sent to the RaSCSI backend service.
"""
from os import path
from socket_cmds import send_pb_command
import rascsi_interface_pb2 as proto
def device_list():
"""
Sends a DEVICES_INFO command to the server.
Returns a list of dicts with info on all attached devices.
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.DEVICES_INFO
data = send_pb_command(command.SerializeToString())
result = proto.PbResult()
result.ParseFromString(data)
dlist = []
i = 0
while i < len(result.devices_info.devices):
did = result.devices_info.devices[i].id
dtype = proto.PbDeviceType.Name(result.devices_info.devices[i].type)
dstat = result.devices_info.devices[i].status
dprop = result.devices_info.devices[i].properties
# Building the status string
dstat_msg = []
if dstat.protected and dprop.protectable:
dstat_msg.append("Write-Protected")
if dstat.removed and dprop.removable:
dstat_msg.append("No Media")
if dstat.locked and dprop.lockable:
dstat_msg.append("Locked")
dfile = path.basename(result.devices_info.devices[i].file.name)
dven = result.devices_info.devices[i].vendor
dprod = result.devices_info.devices[i].product
dlist.append({
"id": did,
"device_type": dtype,
"status": ", ".join(dstat_msg),
"file": dfile,
"vendor": dven,
"product": dprod,
})
i += 1
return dlist

View File

@ -1,341 +1,208 @@
#!/usr/bin/env python3
#
# RaSCSI Updates:
# Updates to output rascsi status to an OLED display
# Copyright (C) 2020 Tony Kuker
# Author: Tony Kuker
# Developed for:
# https://www.makerfocus.com/collections/oled/products/2pcs-i2c-oled-display-module-0-91-inch-i2c-ssd1306-oled-display-module-1
#
# All other code:
# Copyright (c) 2017 Adafruit Industries
# Author: Tony DiCola & James DeVito
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
RaSCSI Updates:
Updates to output rascsi status to an OLED display
Copyright (C) 2020 Tony Kuker
Author: Tony Kuker
Developed for:
https://www.makerfocus.com/collections/oled/products/2pcs-i2c-oled-display-module-0-91-inch-i2c-ssd1306-oled-display-module-1
All other code:
Copyright (c) 2017 Adafruit Industries
Author: Tony DiCola & James DeVito
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from time import sleep
from sys import argv, exit
from sys import argv
from collections import deque
from board import I2C
from adafruit_ssd1306 import SSD1306_I2C
from PIL import Image, ImageDraw, ImageFont
from os import path, getcwd
from collections import deque
from struct import pack, unpack
import signal
import socket
import rascsi_interface_pb2 as proto
class GracefulInterruptHandler(object):
def __init__(self, signals=(signal.SIGINT, signal.SIGTERM)):
self.signals = signals
self.original_handlers = {}
def __enter__(self):
self.interrupted = False
self.released = False
for sig in self.signals:
self.original_handlers[sig] = signal.getsignal(sig)
signal.signal(sig, self.handler)
return self
def handler(self, signum, frame):
self.release()
self.interrupted = True
def __exit__(self, type, value, tb):
self.release()
def release(self):
if self.released:
return False
for sig in self.signals:
signal.signal(sig, self.original_handlers[sig])
self.released = True
return True
from interrupt_handler import GracefulInterruptHandler
from pi_cmds import get_ip_and_host
from ractl_cmds import device_list
WIDTH = 128
HEIGHT = 32 # Change to 64 if needed
BORDER = 5
# How long to delay between each update
delay_time_ms = 1000
DELAY_TIME_MS = 1000
# Define the Reset Pin
oled_reset = None
OLED_RESET = None
# init i2c
i2c = I2C()
I2C = I2C()
# 128x32 display with hardware I2C:
oled = SSD1306_I2C(WIDTH, HEIGHT, i2c, addr=0x3C, reset=oled_reset)
OLED = SSD1306_I2C(WIDTH, HEIGHT, I2C, addr=0x3C, reset=OLED_RESET)
print ("Running with the following display:")
print (oled)
print ()
print ("Will update the OLED display every " + str(delay_time_ms) + "ms (approximately)")
print("Running with the following display:")
print(OLED)
print()
print("Will update the OLED display every " + str(DELAY_TIME_MS) + "ms (approximately)")
# Attempt to read the first argument to the script; fall back to 2 (180 degrees)
if len(argv) > 1:
if str(argv[1]) == "0":
rotation = 0
ROTATION = 0
elif str(argv[1]) == "180":
rotation = 2
ROTATION = 2
else:
exit("Only 0 and 180 are valid arguments for screen rotation.")
else:
print("Defaulting to 180 degrees screen rotation.")
rotation = 2
ROTATION = 2
# Clear display.
oled.rotation = rotation
oled.fill(0)
oled.show()
OLED.rotation = ROTATION
OLED.fill(0)
OLED.show()
# Create blank image for drawing.
# Make sure to create image with mode '1' for 1-bit color.
image = Image.new("1", (oled.width, oled.height))
IMAGE = Image.new("1", (OLED.width, OLED.height))
# Get drawing object to draw on image.
draw = ImageDraw.Draw(image)
DRAW = ImageDraw.Draw(IMAGE)
# Draw a black filled box to clear the image.
draw.rectangle((0,0,WIDTH,HEIGHT), outline=0, fill=0)
DRAW.rectangle((0, 0, WIDTH, HEIGHT), outline=0, fill=0)
# Draw some shapes.
# First define some constants to allow easy resizing of shapes.
padding = 0
top = padding
bottom = HEIGHT-padding
# Depending on the font used, you may want to change the value of PADDING
PADDING = 0
TOP = PADDING
BOTTOM = HEIGHT - PADDING
# Move left to right keeping track of the current x position for drawing shapes.
x = 0
X_POS = 0
# Load default font.
#font = ImageFont.load_default()
# Alternatively load a TTF font. Make sure the .ttf font file is in the same directory as the python script!
# When using other fonds, you may need to adjust padding, font size, and line spacing
# Alternatively load a TTF font. Make sure the .ttf font file
# is in the same directory as the python script!
# When using other fonts, you may need to adjust padding, font size, and line spacing.
# Some other nice fonts to try: http://www.dafont.com/bitmap.php
font = ImageFont.truetype('type_writer.ttf', 8)
FONT = ImageFont.truetype('type_writer.ttf', 8)
def device_list():
"""
Sends a DEVICES_INFO command to the server.
Returns a list of dicts with info on all attached devices.
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.DEVICES_INFO
data = send_pb_command(command.SerializeToString())
result = proto.PbResult()
result.ParseFromString(data)
device_list = []
n = 0
while n < len(result.devices_info.devices):
did = result.devices_info.devices[n].id
dtype = proto.PbDeviceType.Name(result.devices_info.devices[n].type)
dstat = result.devices_info.devices[n].status
dprop = result.devices_info.devices[n].properties
# Building the status string
dstat_msg = []
if dstat.protected == True and dprop.protectable == True:
dstat_msg.append("Write-Protected")
if dstat.removed == True and dprop.removable == True:
dstat_msg.append("No Media")
if dstat.locked == True and dprop.lockable == True:
dstat_msg.append("Locked")
dfile = path.basename(result.devices_info.devices[n].file.name)
dven = result.devices_info.devices[n].vendor
dprod = result.devices_info.devices[n].product
device_list.append(
{
"id": did,
"device_type": dtype,
"status": ", ".join(dstat_msg),
"file": dfile,
"vendor": dven,
"product": dprod,
}
)
n += 1
return device_list
def get_ip_and_host():
host = socket.gethostname()
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# mock ip address; doesn't have to be reachable
s.connect(('10.255.255.255', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip, host
def send_pb_command(payload):
"""
Takes a str containing a serialized protobuf as argument.
Establishes a socket connection with RaSCSI.
"""
# Host and port number where rascsi is listening for socket connections
HOST = 'localhost'
PORT = 6868
counter = 0
tries = 100
error_msg = ""
while counter < tries:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
return send_over_socket(s, payload)
except socket.error as error:
counter += 1
print("The RaSCSI service is not responding - attempt " + \
str(counter) + "/" + str(tries))
error_msg = str(error)
exit(error_msg)
def send_over_socket(s, payload):
"""
Takes a socket object and str payload with serialized protobuf.
Sends payload to RaSCSI over socket and captures the response.
Tries to extract and interpret the protobuf header to get response size.
Reads data from socket in 2048 bytes chunks until all data is received.
"""
# Sending the magic word "RASCSI" to authenticate with the server
s.send(b"RASCSI")
# Prepending a little endian 32bit header with the message size
s.send(pack("<i", len(payload)))
s.send(payload)
# Receive the first 4 bytes to get the response header
response = s.recv(4)
if len(response) >= 4:
# Extracting the response header to get the length of the response message
response_length = unpack("<i", response)[0]
# Reading in chunks, to handle a case where the response message is very large
chunks = []
bytes_recvd = 0
while bytes_recvd < response_length:
chunk = s.recv(min(response_length - bytes_recvd, 2048))
if chunk == b'':
exit("Socket connection has dropped unexpectedly. "
"RaSCSI may have crashed."
)
chunks.append(chunk)
bytes_recvd = bytes_recvd + len(chunk)
response_message = b''.join(chunks)
return response_message
else:
exit("The response from RaSCSI did not contain a protobuf header. "
"RaSCSI may have crashed."
)
IP_ADDR, HOSTNAME = get_ip_and_host()
def formatted_output():
"""
Formats the strings to be displayed on the Screen
Returns a list of str output
Returns a (list) of (str) output
"""
rascsi_list = device_list()
output = []
if len(rascsi_list):
if rascsi_list:
for line in rascsi_list:
if line["device_type"] in ("SCCD", "SCRM", "SCMO"):
if len(line["file"]):
output.append(f"{line['id']} {line['device_type'][2:4]} {line['file']} {line['status']}")
# Print image file name only when there is an image attached
if line["file"]:
output.append(f"{line['id']} {line['device_type'][2:4]} "
f"{line['file']} {line['status']}")
else:
output.append(f"{line['id']} {line['device_type'][2:4]} {line['status']}")
elif line["device_type"] in ("SCDP"):
output.append(f"{line['id']} {line['device_type'][2:4]} {line['vendor']} {line['product']}")
elif line["device_type"] in ("SCBR"):
# Special handling for the DaynaPort device
elif line["device_type"] == "SCDP":
output.append(f"{line['id']} {line['device_type'][2:4]} {line['vendor']} "
f"{line['product']}")
# Special handling for the Host Bridge device
elif line["device_type"] == "SCBR":
output.append(f"{line['id']} {line['device_type'][2:4]} {line['product']}")
# Print only the Vendor/Product info if it's not generic RaSCSI
elif line["vendor"] not in "RaSCSI":
output.append(f"{line['id']} {line['device_type'][2:4]} {line['file']} "
f"{line['vendor']} {line['product']} {line['status']}")
else:
output.append(f"{line['id']} {line['device_type'][2:4]} {line['file']} {line['vendor']} {line['product']} {line['status']}")
output.append(f"{line['id']} {line['device_type'][2:4]} {line['file']} "
f"{line['status']}")
else:
output.append("No image mounted!")
output.append(f"IP {ip} - {host}")
output.append(f"IP {IP_ADDR} - {HOSTNAME}")
return output
def start_splash():
splash = Image.open(f"{cwd}/splash_start.bmp").convert("1")
draw.bitmap((0, 0), splash)
oled.image(splash)
oled.show()
"""
Displays a splash screen for the startup sequence
Make sure the splash bitmap image is in the same dir as this script
"""
splash = Image.open("splash_start.bmp").convert("1")
DRAW.bitmap((0, 0), splash)
OLED.image(splash)
OLED.show()
sleep(6)
def stop_splash():
draw.rectangle((0,0,WIDTH,HEIGHT), outline=0, fill=0)
splash = Image.open(f"{cwd}/splash_stop.bmp").convert("1")
draw.bitmap((0, 0), splash)
oled.image(splash)
oled.show()
cwd = getcwd()
"""
Displays a splash screen for the shutdown sequence
Make sure the splash bitmap image is in the same dir as this script
"""
DRAW.rectangle((0, 0, WIDTH, HEIGHT), outline=0, fill=0)
splash = Image.open("splash_stop.bmp").convert("1")
DRAW.bitmap((0, 0), splash)
OLED.image(splash)
OLED.show()
# Show a startup splash bitmap image before starting the main loop
start_splash()
ip, host = get_ip_and_host()
with GracefulInterruptHandler() as h:
with GracefulInterruptHandler() as handler:
"""
The main loop of displaying attached device info, and other info
"""
while True:
# The reference snapshot of attached devices that will be compared against each cycle
# to identify changes in RaSCSI backend
ref_snapshot = formatted_output()
# The snapshot updated each cycle that will compared with ref_snapshot
snapshot = ref_snapshot
output = deque(snapshot)
# The active output that will be displayed on the screen
active_output = deque(snapshot)
while snapshot == ref_snapshot:
# Draw a black filled box to clear the image.
draw.rectangle((0,0,WIDTH,HEIGHT), outline=0, fill=0)
y_pos = top
for line in output:
draw.text((x, y_pos), line, font=font, fill=255)
DRAW.rectangle((0, 0, WIDTH, HEIGHT), outline=0, fill=0)
y_pos = TOP
for output_line in active_output:
DRAW.text((X_POS, y_pos), output_line, font=FONT, fill=255)
y_pos += 8
# Shift the index of the array by one to get a scrolling effect
if len(output) > 5:
output.rotate(-1)
if len(active_output) > 5:
active_output.rotate(-1)
# Display image.
oled.image(image)
oled.show()
sleep(1000/delay_time_ms)
OLED.image(IMAGE)
OLED.show()
sleep(1000/DELAY_TIME_MS)
snapshot = formatted_output()
if h.interrupted:
if handler.interrupted:
# Catch interrupt signals and show a shutdown splash bitmap image
stop_splash()
exit("Shutting down the OLED display...")

View File

@ -0,0 +1,71 @@
"""
Module for handling socket connections for sending commands
and receiving results from the RaSCSI backend
"""
import socket
from struct import pack, unpack
def send_pb_command(payload):
"""
Takes a (str) containing a serialized protobuf as argument.
Establishes a socket connection with RaSCSI.
"""
# Host and port number where rascsi is listening for socket connections
host = 'localhost'
port = 6868
counter = 0
tries = 100
error_msg = ""
while counter < tries:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((host, port))
return send_over_socket(sock, payload)
except socket.error as error:
counter += 1
print("The RaSCSI service is not responding - attempt %s/%s",
str(counter), str(tries))
error_msg = str(error)
exit(error_msg)
def send_over_socket(sock, payload):
"""
Takes a socket object and (str) payload with serialized protobuf.
Sends payload to RaSCSI over socket and captures the response.
Tries to extract and interpret the protobuf header to get response size.
Reads data from socket in 2048 bytes chunks until all data is received.
"""
# Sending the magic word "RASCSI" to authenticate with the server
sock.send(b"RASCSI")
# Prepending a little endian 32bit header with the message size
sock.send(pack("<i", len(payload)))
sock.send(payload)
# Receive the first 4 bytes to get the response header
response = sock.recv(4)
if len(response) >= 4:
# Extracting the response header to get the length of the response message
response_length = unpack("<i", response)[0]
# Reading in chunks, to handle a case where the response message is very large
chunks = []
bytes_recvd = 0
while bytes_recvd < response_length:
chunk = sock.recv(min(response_length - bytes_recvd, 2048))
if chunk == b'':
exit("Socket connection has dropped unexpectedly. "
"RaSCSI may have crashed."
)
chunks.append(chunk)
bytes_recvd = bytes_recvd + len(chunk)
response_message = b''.join(chunks)
return response_message
exit("The response from RaSCSI did not contain a protobuf header. "
"RaSCSI may have crashed."
)