mirror of
https://github.com/akuker/RASCSI.git
synced 2024-12-07 06:52:52 +00:00
OLED Screen: Inquire device status over the protobuf interface (#349)
* Inquire device status over the protobuf interface * Correct docstring * Direct URL to make website * Add protobuf lib to gitignore
This commit is contained in:
parent
8ab16a1fb2
commit
e10a99c24c
1
.gitignore
vendored
1
.gitignore
vendored
@ -8,5 +8,6 @@ __pycache__
|
||||
src/web/current
|
||||
src/web/rascsi_interface_pb2.py
|
||||
src/oled_monitor/current
|
||||
src/oled_monitor/rascsi_interface_pb2.py
|
||||
src/raspberrypi/hfdisk/
|
||||
*~
|
||||
|
@ -4,7 +4,8 @@
|
||||
# Updates to output rascsi status to an OLED display
|
||||
# Copyright (C) 2020 Tony Kuker
|
||||
# Author: Tony Kuker
|
||||
# Developed for: https://www.amazon.com/MakerFocus-Display-SSD1306-3-3V-5V-Arduino/dp/B079BN2J8V
|
||||
# Developed for:
|
||||
# https://www.makerfocus.com/collections/oled/products/2pcs-i2c-oled-display-module-0-91-inch-i2c-ssd1306-oled-display-module-1
|
||||
#
|
||||
# All other code:
|
||||
# Copyright (c) 2017 Adafruit Industries
|
||||
@ -36,6 +37,7 @@ import busio
|
||||
import adafruit_ssd1306
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
import subprocess
|
||||
import rascsi_interface_pb2 as proto
|
||||
|
||||
WIDTH = 128
|
||||
HEIGHT = 32 # Change to 64 if needed
|
||||
@ -87,39 +89,148 @@ font = ImageFont.load_default()
|
||||
# Some other nice fonts to try: http://www.dafont.com/bitmap.php
|
||||
# font = ImageFont.truetype('Minecraftia.ttf', 8)
|
||||
|
||||
def device_list():
|
||||
"""
|
||||
Sends a DEVICES_INFO command to the server.
|
||||
Returns a list of dicts with info on all attached devices.
|
||||
"""
|
||||
command = proto.PbCommand()
|
||||
command.operation = proto.PbOperation.DEVICES_INFO
|
||||
data = send_pb_command(command.SerializeToString())
|
||||
result = proto.PbResult()
|
||||
result.ParseFromString(data)
|
||||
|
||||
device_list = []
|
||||
n = 0
|
||||
|
||||
while n < len(result.devices_info.devices):
|
||||
did = result.devices_info.devices[n].id
|
||||
dun = result.devices_info.devices[n].unit
|
||||
dtype = proto.PbDeviceType.Name(result.devices_info.devices[n].type)
|
||||
dstat = result.devices_info.devices[n].status
|
||||
dprop = result.devices_info.devices[n].properties
|
||||
|
||||
# Building the status string
|
||||
dstat_msg = []
|
||||
if dprop.read_only == True:
|
||||
dstat_msg.append("Read-Only")
|
||||
if dstat.protected == True and dprop.protectable == True:
|
||||
dstat_msg.append("Write-Protected")
|
||||
if dstat.removed == True and dprop.removable == True:
|
||||
dstat_msg.append("No Media")
|
||||
if dstat.locked == True and dprop.lockable == True:
|
||||
dstat_msg.append("Locked")
|
||||
|
||||
from os import path
|
||||
dpath = result.devices_info.devices[n].file.name
|
||||
dfile = path.basename(dpath)
|
||||
dparam = result.devices_info.devices[n].params
|
||||
dven = result.devices_info.devices[n].vendor
|
||||
dprod = result.devices_info.devices[n].product
|
||||
drev = result.devices_info.devices[n].revision
|
||||
dblock = result.devices_info.devices[n].block_size
|
||||
dsize = int(result.devices_info.devices[n].block_count) * int(dblock)
|
||||
|
||||
device_list.append(
|
||||
{
|
||||
"id": did,
|
||||
"un": dun,
|
||||
"device_type": dtype,
|
||||
"status": ", ".join(dstat_msg),
|
||||
"path": dpath,
|
||||
"file": dfile,
|
||||
"params": dparam,
|
||||
"vendor": dven,
|
||||
"product": dprod,
|
||||
"revision": drev,
|
||||
"block_size": dblock,
|
||||
"size": dsize,
|
||||
}
|
||||
)
|
||||
n += 1
|
||||
|
||||
return device_list
|
||||
|
||||
def send_pb_command(payload):
|
||||
"""
|
||||
Takes a str containing a serialized protobuf as argument.
|
||||
Establishes a socket connection with RaSCSI.
|
||||
"""
|
||||
# Host and port number where rascsi is listening for socket connections
|
||||
HOST = 'localhost'
|
||||
PORT = 6868
|
||||
|
||||
counter = 0
|
||||
tries = 100
|
||||
error_msg = ""
|
||||
|
||||
import socket
|
||||
while counter < tries:
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.connect((HOST, PORT))
|
||||
return send_over_socket(s, payload)
|
||||
except socket.error as error:
|
||||
counter += 1
|
||||
logging.warning("The RaSCSI service is not responding - attempt " + \
|
||||
str(counter) + "/" + str(tries))
|
||||
error_msg = str(error)
|
||||
|
||||
logging.error(error_msg)
|
||||
|
||||
|
||||
def send_over_socket(s, payload):
|
||||
"""
|
||||
Takes a socket object and str payload with serialized protobuf.
|
||||
Sends payload to RaSCSI over socket and captures the response.
|
||||
Tries to extract and interpret the protobuf header to get response size.
|
||||
Reads data from socket in 2048 bytes chunks until all data is received.
|
||||
"""
|
||||
from struct import pack, unpack
|
||||
|
||||
# Prepending a little endian 32bit header with the message size
|
||||
s.send(pack("<i", len(payload)))
|
||||
s.send(payload)
|
||||
|
||||
# Receive the first 4 bytes to get the response header
|
||||
response = s.recv(4)
|
||||
if len(response) >= 4:
|
||||
# Extracting the response header to get the length of the response message
|
||||
response_length = unpack("<i", response)[0]
|
||||
# Reading in chunks, to handle a case where the response message is very large
|
||||
chunks = []
|
||||
bytes_recvd = 0
|
||||
while bytes_recvd < response_length:
|
||||
chunk = s.recv(min(response_length - bytes_recvd, 2048))
|
||||
if chunk == b'':
|
||||
logging.error("Read an empty chunk from the socket. \
|
||||
Socket connection has dropped unexpectedly. \
|
||||
RaSCSI may have has crashed.")
|
||||
chunks.append(chunk)
|
||||
bytes_recvd = bytes_recvd + len(chunk)
|
||||
response_message = b''.join(chunks)
|
||||
return response_message
|
||||
else:
|
||||
logging.error("The response from RaSCSI did not contain a protobuf header. \
|
||||
RaSCSI may have crashed.")
|
||||
|
||||
while True:
|
||||
|
||||
# Draw a black filled box to clear the image.
|
||||
draw.rectangle((0,0,WIDTH,HEIGHT), outline=0, fill=0)
|
||||
|
||||
cmd = "rasctl -l"
|
||||
rascsi_list = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding)
|
||||
rascsi_list = device_list()
|
||||
|
||||
y_pos = top
|
||||
# Draw all of the meaningful data to the 'image'
|
||||
#
|
||||
# Example rascstl -l output:
|
||||
# pi@rascsi:~ $ rasctl -l
|
||||
#
|
||||
# +----+----+------+-------------------------------------
|
||||
# | ID | UN | TYPE | DEVICE STATUS
|
||||
# +----+----+------+-------------------------------------
|
||||
# | 1 | 0 | SCHD | /home/pi/harddisk.hda
|
||||
# | 6 | 0 | SCCD | NO MEDIA
|
||||
# +----+----+------+-------------------------------------
|
||||
# pi@rascsi:~ $
|
||||
for line in rascsi_list.split('\n'):
|
||||
# Skip empty strings, divider lines and the header line...
|
||||
if (len(line) == 0) or line.startswith("+---") or line.startswith("| ID | UN"):
|
||||
continue
|
||||
else:
|
||||
if line.startswith("| "):
|
||||
fields = line.split('|')
|
||||
output = str.strip(fields[1]) + " " + str.strip(fields[3]) + " " + os.path.basename(str.strip(fields[4]))
|
||||
else:
|
||||
output = "No image mounted!"
|
||||
draw.text((x, y_pos), output, font=font, fill=255)
|
||||
y_pos += 8
|
||||
if len(rascsi_list):
|
||||
for line in rascsi_list:
|
||||
output = f"{line['id']} {line['device_type']} {line['file']}"
|
||||
draw.text((x, y_pos), output, font=font, fill=255)
|
||||
y_pos += 8
|
||||
else:
|
||||
output = "No image mounted!"
|
||||
draw.text((x, y_pos), output, font=font, fill=255)
|
||||
y_pos += 8
|
||||
|
||||
# If there is still room on the screen, we'll display the time. If there's not room it will just be clipped
|
||||
draw.text((x, y_pos), datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), font=font, fill=255)
|
||||
|
@ -12,3 +12,4 @@ pyusb==1.1.1
|
||||
rpi-ws281x==4.2.5
|
||||
RPi.GPIO==0.7.0
|
||||
sysv-ipc==1.1.0
|
||||
protobuf==3.17.3
|
||||
|
Loading…
Reference in New Issue
Block a user