Auto-format Python sources with black, fix all issues reported by flake8 (#1010)

* Update config for black and flake8
* Auto-format Python sources with black
* Fix issues reported by flake8
* Exclude protobuf files from black
* Address formatting feedback
This commit is contained in:
nucleogenic 2022-11-30 05:19:17 +00:00 committed by GitHub
parent 5afc6b911f
commit 315ef9f248
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1073 additions and 725 deletions

5
python/.flake8 Normal file
View File

@ -0,0 +1,5 @@
[flake8]
max-line-length = 100
exclude =
venv
rascsi_interface_pb2.py

View File

@ -13,17 +13,11 @@ CONFIG_FILE_SUFFIX = "json"
PROPERTIES_SUFFIX = "properties"
# Supported archive file suffixes
ARCHIVE_FILE_SUFFIXES = [
"zip",
"sit",
"tar",
"gz",
"7z"
]
ARCHIVE_FILE_SUFFIXES = ["zip", "sit", "tar", "gz", "7z"]
# The RESERVATIONS list is used to keep track of the reserved ID memos.
# Initialize with a list of 8 empty strings.
RESERVATIONS = ["" for _ in range(0, 8)]
# Standard error message for shell commands
SHELL_ERROR = "Shell command: \"%s\" led to error: %s"
SHELL_ERROR = 'Shell command: "%s" led to error: %s'

View File

@ -8,8 +8,8 @@ class FailedSocketConnectionException(Exception):
class EmptySocketChunkException(Exception):
"""Raise when a socket payload contains an empty chunk which implies a possible problem. """
"""Raise when a socket payload contains an empty chunk which implies a possible problem."""
class InvalidProtobufResponse(Exception):
"""Raise when a rascsi socket payload contains unpexpected data. """
"""Raise when a rascsi socket payload contains unpexpected data."""

View File

@ -35,6 +35,7 @@ FILE_READ_ERROR = "Unhandled exception when reading file: %s"
FILE_WRITE_ERROR = "Unhandled exception when writing to file: %s"
URL_SAFE = "/:?&"
class FileCmds:
"""
class for methods reading from and writing to the file system
@ -57,15 +58,10 @@ class FileCmds:
files_list = []
for file_path, _dirs, files in walk(dir_path):
# Only list selected file types
# TODO: Refactor for readability?
files = [f for f in files if f.lower().endswith(file_types)]
files_list.extend(
[
(
file,
path.getsize(path.join(file_path, file))
)
for file in files
]
[(file, path.getsize(path.join(file_path, file))) for file in files],
)
return files_list
@ -108,7 +104,7 @@ class FileCmds:
if file.name in prop_files:
process = self.read_drive_properties(
Path(CFG_DIR) / f"{file.name}.{PROPERTIES_SUFFIX}"
)
)
prop = process["conf"]
else:
prop = False
@ -118,12 +114,14 @@ class FileCmds:
try:
archive_info = self._get_archive_info(
f"{server_info['image_dir']}/{file.name}",
_cache_extra_key=file.size
)
_cache_extra_key=file.size,
)
properties_files = [x["path"]
for x in archive_info["members"]
if x["path"].endswith(PROPERTIES_SUFFIX)]
properties_files = [
x["path"]
for x in archive_info["members"]
if x["path"].endswith(PROPERTIES_SUFFIX)
]
for member in archive_info["members"]:
if member["is_dir"] or member["is_resource_fork"]:
@ -132,7 +130,9 @@ class FileCmds:
if PurePath(member["path"]).suffix.lower()[1:] == PROPERTIES_SUFFIX:
member["is_properties_file"] = True
elif f"{member['path']}.{PROPERTIES_SUFFIX}" in properties_files:
member["related_properties_file"] = f"{member['path']}.{PROPERTIES_SUFFIX}"
member[
"related_properties_file"
] = f"{member['path']}.{PROPERTIES_SUFFIX}"
archive_contents.append(member)
except (unarchiver.LsarCommandError, unarchiver.LsarOutputError):
@ -140,14 +140,16 @@ class FileCmds:
size_mb = "{:,.1f}".format(file.size / 1024 / 1024)
dtype = proto.PbDeviceType.Name(file.type)
files.append({
"name": file.name,
"size": file.size,
"size_mb": size_mb,
"detected_type": dtype,
"prop": prop,
"archive_contents": archive_contents,
})
files.append(
{
"name": file.name,
"size": file.size,
"size_mb": size_mb,
"detected_type": dtype,
"prop": prop,
"archive_contents": archive_contents,
}
)
return {"status": result.status, "msg": result.msg, "files": files}
@ -233,22 +235,20 @@ class FileCmds:
Takes (Path) file_path for the file to delete
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"file_path": file_path
}
parameters = {"file_path": file_path}
if file_path.exists():
file_path.unlink()
return {
"status": True,
"return_code": ReturnCodes.DELETEFILE_SUCCESS,
"parameters": parameters,
}
return {
"status": False,
"return_code": ReturnCodes.DELETEFILE_FILE_NOT_FOUND,
"status": True,
"return_code": ReturnCodes.DELETEFILE_SUCCESS,
"parameters": parameters,
}
return {
"status": False,
"return_code": ReturnCodes.DELETEFILE_FILE_NOT_FOUND,
"parameters": parameters,
}
# noinspection PyMethodMayBeStatic
def rename_file(self, file_path, target_path):
@ -258,21 +258,19 @@ class FileCmds:
- (Path) target_path for the name to rename
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"target_path": target_path
}
parameters = {"target_path": target_path}
if target_path.parent.exists:
file_path.rename(target_path)
return {
"status": True,
"return_code": ReturnCodes.RENAMEFILE_SUCCESS,
"parameters": parameters,
}
}
return {
"status": False,
"return_code": ReturnCodes.RENAMEFILE_UNABLE_TO_MOVE,
"parameters": parameters,
}
}
# noinspection PyMethodMayBeStatic
def copy_file(self, file_path, target_path):
@ -282,21 +280,19 @@ class FileCmds:
- (Path) target_path for the name to copy to
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"target_path": target_path
}
parameters = {"target_path": target_path}
if target_path.parent.exists:
copyfile(str(file_path), str(target_path))
return {
"status": True,
"return_code": ReturnCodes.WRITEFILE_SUCCESS,
"parameters": parameters,
}
}
return {
"status": False,
"return_code": ReturnCodes.WRITEFILE_UNABLE_TO_WRITE,
"parameters": parameters,
}
}
def extract_image(self, file_path, members=None, move_properties_files_to_config=True):
"""
@ -312,60 +308,66 @@ class FileCmds:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_NO_FILES_SPECIFIED,
}
}
try:
extract_result = unarchiver.extract_archive(
f"{server_info['image_dir']}/{file_path}",
members=members,
output_dir=server_info["image_dir"],
)
)
properties_files_moved = []
if move_properties_files_to_config:
for file in extract_result["extracted"]:
if file.get("name").endswith(f".{PROPERTIES_SUFFIX}"):
prop_path = Path(CFG_DIR) / file["name"]
if (self.rename_file(
Path(file["absolute_path"]),
prop_path,
)):
properties_files_moved.append({
"status": True,
"name": file["path"],
"path": str(prop_path),
})
if self.rename_file(
Path(file["absolute_path"]),
prop_path,
):
properties_files_moved.append(
{
"status": True,
"name": file["path"],
"path": str(prop_path),
}
)
else:
properties_files_moved.append({
"status": False,
"name": file["path"],
"path": str(prop_path),
})
properties_files_moved.append(
{
"status": False,
"name": file["path"],
"path": str(prop_path),
}
)
return {
"status": True,
"return_code": ReturnCodes.EXTRACTIMAGE_SUCCESS,
"parameters": {
"count": len(extract_result["extracted"]),
},
},
"extracted": extract_result["extracted"],
"skipped": extract_result["skipped"],
"properties_files_moved": properties_files_moved,
}
}
except unarchiver.UnarNoFilesExtractedError:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_NO_FILES_EXTRACTED,
}
except (unarchiver.UnarCommandError, unarchiver.UnarUnexpectedOutputError) as error:
}
except (
unarchiver.UnarCommandError,
unarchiver.UnarUnexpectedOutputError,
) as error:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_COMMAND_ERROR,
"parameters": {
"error": error,
}
}
},
}
# noinspection PyMethodMayBeStatic
def partition_disk(self, file_name, volume_name, disk_format):
@ -399,42 +401,42 @@ class FileCmds:
if disk_format == "HFS":
partitioning_tool = "hfdisk"
commands = [
"i",
"",
"C",
"",
"32",
"Driver_Partition",
"Apple_Driver",
"C",
"",
"",
volume_name,
"Apple_HFS",
"w",
"y",
"p",
]
"i",
"",
"C",
"",
"32",
"Driver_Partition",
"Apple_Driver",
"C",
"",
"",
volume_name,
"Apple_HFS",
"w",
"y",
"p",
]
# Create a DOS label, primary partition, W95 FAT type
elif disk_format == "FAT":
partitioning_tool = "fdisk"
commands = [
"o",
"n",
"p",
"",
"",
"",
"t",
"b",
"w",
]
"o",
"n",
"p",
"",
"",
"",
"t",
"b",
"w",
]
try:
process = Popen(
[partitioning_tool, str(full_file_path)],
stdin=PIPE,
stdout=PIPE,
)
[partitioning_tool, str(full_file_path)],
stdin=PIPE,
stdout=PIPE,
)
for command in commands:
process.stdin.write(bytes(command + "\n", "utf-8"))
process.stdin.flush()
@ -464,7 +466,6 @@ class FileCmds:
return {"status": True, "msg": ""}
# noinspection PyMethodMayBeStatic
def format_hfs(self, file_name, volume_name, driver_path):
"""
@ -514,7 +515,6 @@ class FileCmds:
return {"status": True, "msg": ""}
# noinspection PyMethodMayBeStatic
def format_fat(self, file_name, volume_name, fat_size):
"""
@ -538,21 +538,21 @@ class FileCmds:
else:
logging.info(process.stdout.decode("utf-8"))
self.delete_file(Path(file_name))
return {"status": False, "msg": error.stderr.decode("utf-8")}
return {"status": False, "msg": process.stderr.decode("utf-8")}
except (FileNotFoundError, CalledProcessError) as error:
logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8"))
self.delete_file(Path(file_name))
return {"status": False, "msg": error.stderr.decode("utf-8")}
args = [
"mkfs.fat",
"-v",
"-F",
fat_size,
"-n",
volume_name,
"/dev/mapper/" + loopback_device,
]
args = [
"mkfs.fat",
"-v",
"-F",
fat_size,
"-n",
volume_name,
"/dev/mapper/" + loopback_device,
]
try:
process = run(
args,
@ -582,7 +582,6 @@ class FileCmds:
return {"status": True, "msg": ""}
def download_file_to_iso(self, url, *iso_args):
"""
Takes (str) url and one or more (str) *iso_args
@ -592,7 +591,7 @@ class FileCmds:
server_info = self.ractl.get_server_info()
file_name = PurePath(url).name
iso_filename = Path(server_info['image_dir']) / f"{file_name}.iso"
iso_filename = Path(server_info["image_dir"]) / f"{file_name}.iso"
with TemporaryDirectory() as tmp_dir:
req_proc = self.download_to_dir(quote(url, safe=URL_SAFE), tmp_dir, file_name)
@ -603,23 +602,30 @@ class FileCmds:
tmp_full_path = Path(tmp_dir) / file_name
if is_zipfile(tmp_full_path):
if "XtraStuf.mac" in str(ZipFile(str(tmp_full_path)).namelist()):
logging.info("MacZip file format detected. Will not unzip to retain resource fork.")
logging.info(
"MacZip file format detected. Will not unzip to retain resource fork."
)
else:
logging.info(
"%s is a zipfile! Will attempt to unzip and store the resulting files.",
tmp_full_path,
)
unzip_proc = asyncio.run(
self.run_async(
"unzip",
[
"-d",
str(tmp_dir),
"-n",
str(tmp_full_path),
],
)
unzip_proc = asyncio.run(self.run_async("unzip", [
"-d",
str(tmp_dir),
"-n",
str(tmp_full_path),
]))
)
if not unzip_proc["returncode"]:
logging.info(
"%s was successfully unzipped. Deleting the zipfile.",
tmp_full_path,
)
)
tmp_full_path.unlink(True)
try:
@ -638,9 +644,7 @@ class FileCmds:
logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8"))
return {"status": False, "msg": error.stderr.decode("utf-8")}
parameters = {
"value": " ".join(iso_args)
}
parameters = {"value": " ".join(iso_args)}
return {
"status": True,
"return_code": ReturnCodes.DOWNLOADFILETOISO_SUCCESS,
@ -658,10 +662,10 @@ class FileCmds:
try:
with requests.get(
quote(url, safe=URL_SAFE),
stream=True,
headers={"User-Agent": "Mozilla/5.0"},
) as req:
quote(url, safe=URL_SAFE),
stream=True,
headers={"User-Agent": "Mozilla/5.0"},
) as req:
req.raise_for_status()
try:
with open(f"{save_dir}/{file_name}", "wb") as download:
@ -677,10 +681,7 @@ class FileCmds:
logging.info("Response content-type: %s", req.headers["content-type"])
logging.info("Response status code: %s", req.status_code)
parameters = {
"file_name": file_name,
"save_dir": save_dir
}
parameters = {"file_name": file_name, "save_dir": save_dir}
return {
"status": True,
"return_code": ReturnCodes.DOWNLOADTODIR_SUCCESS,
@ -715,28 +716,29 @@ class FileCmds:
reserved_ids_and_memos = []
reserved_ids = self.ractl.get_reserved_ids()["ids"]
for scsi_id in reserved_ids:
reserved_ids_and_memos.append({"id": scsi_id,
"memo": RESERVATIONS[int(scsi_id)]})
dump(
{"version": version,
"devices": devices,
"reserved_ids": reserved_ids_and_memos},
json_file,
indent=4
reserved_ids_and_memos.append(
{"id": scsi_id, "memo": RESERVATIONS[int(scsi_id)]}
)
parameters = {
"target_path": file_path
}
dump(
{
"version": version,
"devices": devices,
"reserved_ids": reserved_ids_and_memos,
},
json_file,
indent=4,
)
parameters = {"target_path": file_path}
return {
"status": True,
"return_code": ReturnCodes.WRITEFILE_SUCCESS,
"parameters": parameters,
}
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
self.delete_file(Path(file_path))
return {"status": False, "msg": str(error)}
except:
except Exception:
logging.error(FILE_WRITE_ERROR, file_name)
self.delete_file(Path(file_path))
raise
@ -770,7 +772,7 @@ class FileCmds:
"revision": row["revision"],
"block_size": row["block_size"],
"params": dict(row["params"]),
}
}
if row["image"]:
kwargs["params"]["file"] = row["image"]
self.ractl.attach_device(row["id"], **kwargs)
@ -789,27 +791,27 @@ class FileCmds:
"revision": row["revision"],
"block_size": row["block_size"],
"params": dict(row["params"]),
}
}
if row["image"]:
kwargs["params"]["file"] = row["image"]
self.ractl.attach_device(row["id"], **kwargs)
logging.warning("%s is in an obsolete config file format", file_name)
else:
return {"status": False,
"return_code": ReturnCodes.READCONFIG_INVALID_CONFIG_FILE_FORMAT}
return {
"status": False,
"return_code": ReturnCodes.READCONFIG_INVALID_CONFIG_FILE_FORMAT,
}
parameters = {
"file_name": file_name
}
parameters = {"file_name": file_name}
return {
"status": True,
"return_code": ReturnCodes.READCONFIG_SUCCESS,
"parameters": parameters
}
"parameters": parameters,
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
return {"status": False, "msg": str(error)}
except:
except Exception:
logging.error(FILE_READ_ERROR, str(file_path))
raise
@ -823,19 +825,17 @@ class FileCmds:
try:
with open(file_path, "w") as json_file:
dump(conf, json_file, indent=4)
parameters = {
"target_path": str(file_path)
}
parameters = {"target_path": str(file_path)}
return {
"status": True,
"return_code": ReturnCodes.WRITEFILE_SUCCESS,
"parameters": parameters,
}
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
self.delete_file(file_path)
return {"status": False, "msg": str(error)}
except:
except Exception:
logging.error(FILE_WRITE_ERROR, str(file_path))
self.delete_file(file_path)
raise
@ -850,19 +850,17 @@ class FileCmds:
try:
with open(file_path) as json_file:
conf = load(json_file)
parameters = {
"file_path": str(file_path)
}
parameters = {"file_path": str(file_path)}
return {
"status": True,
"return_codes": ReturnCodes.READDRIVEPROPS_SUCCESS,
"parameters": parameters,
"conf": conf,
}
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
return {"status": False, "msg": str(error)}
except:
except Exception:
logging.error(FILE_READ_ERROR, str(file_path))
raise
@ -877,11 +875,17 @@ class FileCmds:
program,
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
logging.info("Executed command \"%s %s\" with status code %d", program, " ".join(args), proc.returncode)
logging.info(
'Executed command "%s %s" with status code %d',
program,
" ".join(args),
proc.returncode,
)
if stdout:
stdout = stdout.decode()
logging.info("stdout: %s", stdout)

View File

@ -11,6 +11,7 @@ class RaCtlCmds:
"""
Class for commands sent to the RaSCSI backend service.
"""
def __init__(self, sock_cmd: SocketCmds, token=None, locale="en"):
self.sock_cmd = sock_cmd
self.token = token
@ -37,9 +38,13 @@ class RaCtlCmds:
data = self.sock_cmd.send_pb_command(command.SerializeToString())
result = proto.PbResult()
result.ParseFromString(data)
version = (str(result.server_info.version_info.major_version) + "." +
str(result.server_info.version_info.minor_version) + "." +
str(result.server_info.version_info.patch_version))
version = (
str(result.server_info.version_info.major_version)
+ "."
+ str(result.server_info.version_info.minor_version)
+ "."
+ str(result.server_info.version_info.patch_version)
)
log_levels = list(result.server_info.log_level_info.log_levels)
current_log_level = result.server_info.log_level_info.current_log_level
reserved_ids = list(result.server_info.reserved_ids_info.ids)
@ -74,7 +79,7 @@ class RaCtlCmds:
"scrm": scrm,
"scmo": scmo,
"sccd": sccd,
}
}
def get_reserved_ids(self):
"""
@ -137,11 +142,11 @@ class RaCtlCmds:
for key, value in device.properties.default_params.items():
params[key] = value
device_types[proto.PbDeviceType.Name(device.type)] = {
"removable": device.properties.removable,
"supports_file": device.properties.supports_file,
"params": params,
"block_sizes": list(device.properties.block_sizes),
}
"removable": device.properties.removable,
"supports_file": device.properties.supports_file,
"params": params,
"block_sizes": list(device.properties.block_sizes),
}
return {"status": result.status, "device_types": device_types}
def get_removable_device_types(self):
@ -176,8 +181,8 @@ class RaCtlCmds:
device_types = self.get_device_types()
image_device_types = self.get_disk_device_types()
peripheral_device_types = [
x for x in device_types["device_types"] if x not in image_device_types
]
x for x in device_types["device_types"] if x not in image_device_types
]
return peripheral_device_types
def get_image_files_info(self):
@ -205,7 +210,7 @@ class RaCtlCmds:
"images_dir": images_dir,
"image_files": image_files,
"scan_depth": scan_depth,
}
}
def attach_device(self, scsi_id, **kwargs):
"""
@ -245,13 +250,13 @@ class RaCtlCmds:
if current_type != device_type:
parameters = {
"device_type": device_type,
"current_device_type": current_type
"current_device_type": current_type,
}
return {
"status": False,
"return_code": ReturnCodes.ATTACHIMAGE_COULD_NOT_ATTACH,
"parameters": parameters,
}
}
command.operation = proto.PbOperation.INSERT
# Handling attaching a new device
@ -394,20 +399,22 @@ class RaCtlCmds:
dblock = result.devices_info.devices[i].block_size
dsize = int(result.devices_info.devices[i].block_count) * int(dblock)
device_list.append({
"id": did,
"unit": dunit,
"device_type": dtype,
"status": ", ".join(dstat_msg),
"image": dpath,
"file": dfile,
"params": dparam,
"vendor": dven,
"product": dprod,
"revision": drev,
"block_size": dblock,
"size": dsize,
})
device_list.append(
{
"id": did,
"unit": dunit,
"device_type": dtype,
"status": ", ".join(dstat_msg),
"image": dpath,
"file": dfile,
"params": dparam,
"vendor": dven,
"product": dprod,
"revision": drev,
"block_size": dblock,
"size": dsize,
}
)
i += 1
return {"status": result.status, "msg": result.msg, "device_list": device_list}

View File

@ -6,6 +6,7 @@ Module for return codes that are refrenced in the return payloads of the rascsi
# pylint: disable=too-few-public-methods
class ReturnCodes:
"""Class for the return codes used within the rascsi module."""
DELETEFILE_SUCCESS = 0
DELETEFILE_FILE_NOT_FOUND = 1
RENAMEFILE_SUCCESS = 10

View File

@ -7,15 +7,18 @@ import socket
from time import sleep
from struct import pack, unpack
from rascsi.exceptions import (EmptySocketChunkException,
InvalidProtobufResponse,
FailedSocketConnectionException)
from rascsi.exceptions import (
EmptySocketChunkException,
InvalidProtobufResponse,
FailedSocketConnectionException,
)
class SocketCmds:
"""
Class for sending and receiving data over a socket connection with the RaSCSI backend
"""
def __init__(self, host="localhost", port=6868):
self.host = host
self.port = port
@ -38,8 +41,11 @@ class SocketCmds:
return response
except socket.error as error:
counter += 1
logging.warning("The RaSCSI service is not responding - attempt %s/%s",
str(counter), str(tries))
logging.warning(
"The RaSCSI service is not responding - attempt %s/%s",
str(counter),
str(tries),
)
error_msg = str(error)
sleep(0.2)
except EmptySocketChunkException as ex:
@ -75,18 +81,22 @@ class SocketCmds:
bytes_recvd = 0
while bytes_recvd < response_length:
chunk = sock.recv(min(response_length - bytes_recvd, 2048))
if chunk == b'':
error_message = ("Read an empty chunk from the socket. Socket connection has "
"dropped unexpectedly. RaSCSI may have crashed.")
if chunk == b"":
error_message = (
"Read an empty chunk from the socket. Socket connection has "
"dropped unexpectedly. RaSCSI may have crashed."
)
logging.error(error_message)
raise EmptySocketChunkException(error_message)
chunks.append(chunk)
bytes_recvd = bytes_recvd + len(chunk)
response_message = b''.join(chunks)
response_message = b"".join(chunks)
return response_message
error_message = ("The response from RaSCSI did not contain a protobuf header. "
"RaSCSI may have crashed.")
error_message = (
"The response from RaSCSI did not contain a protobuf header. "
"RaSCSI may have crashed."
)
logging.error(error_message)
raise InvalidProtobufResponse(error_message)

View File

@ -12,6 +12,7 @@ from platform import uname
from rascsi.common_settings import SHELL_ERROR
class SysCmds:
"""
Class for commands sent to the Pi's Linux system.
@ -30,7 +31,7 @@ class SysCmds:
["git", "rev-parse", "HEAD"],
capture_output=True,
check=True,
)
)
.stdout.decode("utf-8")
.strip()
)
@ -68,7 +69,7 @@ class SysCmds:
return {
"git": ra_git_version,
"env": f"{hardware}, {env.system} {env.release} {env.machine}",
}
}
@staticmethod
def running_proc(daemon):
@ -82,7 +83,7 @@ class SysCmds:
["ps", "aux"],
capture_output=True,
check=True,
)
)
.stdout.decode("utf-8")
.strip()
)
@ -104,7 +105,7 @@ class SysCmds:
["brctl", "show"],
capture_output=True,
check=True,
)
)
.stdout.decode("utf-8")
.strip()
)
@ -155,7 +156,7 @@ class SysCmds:
sock = socket(AF_INET, SOCK_DGRAM)
try:
# mock ip address; doesn't have to be reachable
sock.connect(('10.255.255.255', 1))
sock.connect(("10.255.255.255", 1))
ip_addr = sock.getsockname()[0]
except Exception:
ip_addr = False
@ -170,10 +171,10 @@ class SysCmds:
"""
try:
process = run(
["hostnamectl", "status", "--pretty"],
capture_output=True,
check=True,
)
["hostnamectl", "status", "--pretty"],
capture_output=True,
check=True,
)
pretty_hostname = process.stdout.decode("utf-8").rstrip()
if pretty_hostname:
return pretty_hostname
@ -188,11 +189,11 @@ class SysCmds:
Set the pretty hostname for the system
"""
try:
process = run(
["sudo", "hostnamectl", "set-hostname", "--pretty", name],
capture_output=False,
check=True,
)
run(
["sudo", "hostnamectl", "set-hostname", "--pretty", name],
capture_output=False,
check=True,
)
except CalledProcessError as error:
logging.error(str(error))
return False
@ -213,9 +214,9 @@ class SysCmds:
if scope:
scope_param = ["-u", scope]
process = run(
["journalctl"] + line_param + scope_param,
capture_output=True,
)
["journalctl"] + line_param + scope_param,
capture_output=True,
)
if process.returncode == 0:
return process.returncode, process.stdout.decode("utf-8")
@ -228,9 +229,9 @@ class SysCmds:
Returns either the disktype output, or the stderr output.
"""
process = run(
["disktype", file_path],
capture_output=True,
)
["disktype", file_path],
capture_output=True,
)
if process.returncode == 0:
return process.returncode, process.stdout.decode("utf-8")
@ -243,9 +244,9 @@ class SysCmds:
Returns either the man2html output, or the stderr output.
"""
process = run(
["man2html", file_path, "-M", "/"],
capture_output=True,
)
["man2html", file_path, "-M", "/"],
capture_output=True,
)
if process.returncode == 0:
return process.returncode, process.stdout.decode("utf-8")
@ -257,9 +258,9 @@ class SysCmds:
Sends a reboot command to the system
"""
process = run(
["sudo", "reboot"],
capture_output=True,
)
["sudo", "reboot"],
capture_output=True,
)
if process.returncode == 0:
return process.returncode, process.stdout.decode("utf-8")
@ -271,9 +272,9 @@ class SysCmds:
Sends a shutdown command to the system
"""
process = run(
["sudo", "shutdown", "-h", "now"],
capture_output=True,
)
["sudo", "shutdown", "-h", "now"],
capture_output=True,
)
if process.returncode == 0:
return process.returncode, process.stdout.decode("utf-8")

View File

@ -4,31 +4,27 @@ Utility module for running system commands with basic logging
import asyncio
import logging
import os
def run(program, args=None):
""" Run a command and return its output """
"""Run a command and return its output"""
return asyncio.run(run_async(program, args))
async def run_async(program, args=None):
""" Run a command in the background """
"""Run a command in the background"""
proc = await asyncio.create_subprocess_exec(
program,
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
program, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
logging.info(
"Executed command \"%s %s\" with status code %d",
'Executed command "%s %s" with status code %d',
program,
" ".join(args),
proc.returncode
)
proc.returncode,
)
if stdout:
stdout = stdout.decode()
@ -42,4 +38,4 @@ async def run_async(program, args=None):
"returncode": proc.returncode,
"stdout": stdout,
"stderr": stderr,
}
}

View File

@ -25,7 +25,8 @@ def extract_archive(file_path, **kwargs):
Takes (str) file_path, and kwargs:
- (list) members - list of (str) files to be extracted (all files are extracted if None)
- (str) output_dir - directory to place the extracted files
- (str) fork_output_type - output type for resource forks; "visible" for *.rsrc files, "hidden" for ._* files
- (str) fork_output_type - output type for resource forks;
"visible" for *.rsrc files, "hidden" for ._* files
Returns (dict) of extracted and skipped members
"""
members = kwargs.get("members")
@ -39,7 +40,9 @@ def extract_archive(file_path, **kwargs):
if kwargs.get("fork_output_type"):
if kwargs["fork_output_type"] not in FORK_OUTPUT_TYPES:
raise ValueError(f"Argument fork_output_type must be one of: {','.join(FORK_OUTPUT_TYPES)} ")
raise ValueError(
f"Argument fork_output_type must be one of: {','.join(FORK_OUTPUT_TYPES)} "
)
fork_output_type = kwargs["fork_output_type"]
fork_output_type_args = ["-forks", fork_output_type or FORK_OUTPUT_TYPE_VISIBLE]
else:
@ -53,9 +56,9 @@ def extract_archive(file_path, **kwargs):
"-force-skip",
"-no-directory",
*fork_output_type_args,
'--',
"--",
file_path,
]
]
if members:
for member in members:
@ -68,8 +71,10 @@ def extract_archive(file_path, **kwargs):
unar_result_success = r'^Successfully extracted to "(?P<destination>.+)".$'
unar_result_no_files = "No files extracted."
unar_file_extracted = \
r"^ {2}(?P<path>.+). \(((?P<size>\d+) B)?(?P<types>(dir)?(, )?(rsrc)?)\)\.\.\. (?P<status>[A-Z]+)\.$"
unar_file_extracted = (
r"^ {2}(?P<path>.+). \(((?P<size>\d+) B)?(?P<types>(dir)?(, )?"
r"(rsrc)?)\)\.\.\. (?P<status>[A-Z]+)\.$"
)
lines = process["stdout"].rstrip("\n").split("\n")
@ -90,7 +95,7 @@ def extract_archive(file_path, **kwargs):
"is_dir": False,
"is_resource_fork": False,
"absolute_path": str(pathlib.PurePath(tmp_dir).joinpath(matches["path"])),
}
}
member_types = matches.get("types", "")
if member_types.startswith(", "):
@ -112,10 +117,14 @@ def extract_archive(file_path, **kwargs):
member["name"] = f"._{member['name']}"
else:
member["name"] += ".rsrc"
member["path"] = str(pathlib.PurePath(member["path"]).parent.joinpath(member["name"]))
member["absolute_path"] = str(pathlib.PurePath(tmp_dir).joinpath(member["path"]))
member["path"] = str(
pathlib.PurePath(member["path"]).parent.joinpath(member["name"])
)
member["absolute_path"] = str(
pathlib.PurePath(tmp_dir).joinpath(member["path"])
)
logging.debug("Extracted: %s -> %s", member['path'], member['absolute_path'])
logging.debug("Extracted: %s -> %s", member["path"], member["absolute_path"])
extracted_members.append(member)
else:
raise UnarUnexpectedOutputError(f"Unexpected output: {line}")
@ -128,7 +137,10 @@ def extract_archive(file_path, **kwargs):
member["absolute_path"] = str(target_path)
if target_path.exists():
logging.info("Skipping temp file/dir as the target already exists: %s", target_path)
logging.info(
"Skipping temp file/dir as the target already exists: %s",
target_path,
)
skipped.append(member)
continue
@ -147,7 +159,7 @@ def extract_archive(file_path, **kwargs):
return {
"extracted": moved,
"skipped": skipped,
}
}
raise UnarUnexpectedOutputError(lines[-1])
@ -171,37 +183,41 @@ def inspect_archive(file_path):
except JSONDecodeError as error:
raise LsarOutputError(f"Unable to read JSON output from lsar: {error.msg}") from error
members = [{
"name": pathlib.PurePath(member.get("XADFileName")).name,
"path": member.get("XADFileName"),
"size": member.get("XADFileSize"),
"is_dir": member.get("XADIsDirectory"),
"is_resource_fork": member.get("XADIsResourceFork"),
"raw": member,
} for member in archive_info.get("lsarContents", [])]
members = [
{
"name": pathlib.PurePath(member.get("XADFileName")).name,
"path": member.get("XADFileName"),
"size": member.get("XADFileSize"),
"is_dir": member.get("XADIsDirectory"),
"is_resource_fork": member.get("XADIsResourceFork"),
"raw": member,
}
for member in archive_info.get("lsarContents", [])
]
return {
"format": archive_info.get("lsarFormatName"),
"members": members,
}
}
class UnarCommandError(Exception):
""" Command execution was unsuccessful """
"""Command execution was unsuccessful"""
pass
class UnarNoFilesExtractedError(Exception):
""" Command completed, but no files extracted """
"""Command completed, but no files extracted"""
class UnarUnexpectedOutputError(Exception):
""" Command output not recognized """
"""Command output not recognized"""
class LsarCommandError(Exception):
""" Command execution was unsuccessful """
"""Command execution was unsuccessful"""
class LsarOutputError(Exception):
""" Command output could not be parsed"""
"""Command output could not be parsed"""

View File

@ -7,6 +7,7 @@ from ctrlboard_hw.ctrlboard_hw_constants import CtrlBoardHardwareConstants
# pylint: disable=too-few-public-methods
class CtrlboardConfig:
"""Class for central RaSCSI control board configuration parameters"""
ROTATION = 0
WIDTH = 128
HEIGHT = 64

View File

@ -19,8 +19,12 @@ from rascsi_menu_controller import RascsiMenuController
class CtrlBoardMenuUpdateEventHandler(Observer):
"""Class interfacing the menu controller the RaSCSI Control Board hardware."""
def __init__(self, menu_controller: RascsiMenuController, sock_cmd: SocketCmds,
ractl_cmd: RaCtlCmds):
def __init__(
self,
menu_controller: RascsiMenuController,
sock_cmd: SocketCmds,
ractl_cmd: RaCtlCmds,
):
self.message = None
self._menu_controller = menu_controller
self._menu_renderer_config = self._menu_controller.get_menu_renderer().get_config()
@ -73,16 +77,18 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
def handle_button1(self):
"""Method for handling the first cycle button (cycle profiles)"""
if self.rascsi_profile_cycler is None:
self.rascsi_profile_cycler = RascsiProfileCycler(self._menu_controller, self.sock_cmd,
self.ractl_cmd, return_entry=True)
self.rascsi_profile_cycler = RascsiProfileCycler(
self._menu_controller, self.sock_cmd, self.ractl_cmd, return_entry=True
)
else:
self.rascsi_profile_cycler.cycle()
def handle_button2(self):
"""Method for handling the second cycle button (cycle shutdown)"""
if self.rascsi_shutdown_cycler is None:
self.rascsi_shutdown_cycler = RascsiShutdownCycler(self._menu_controller, self.sock_cmd,
self.ractl_cmd)
self.rascsi_shutdown_cycler = RascsiShutdownCycler(
self._menu_controller, self.sock_cmd, self.ractl_cmd
)
else:
self.rascsi_shutdown_cycler.cycle()
@ -100,8 +106,10 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
handler_function(info_object)
except AttributeError:
log = logging.getLogger(__name__)
log.error("Handler function [%s] not found or returned an error. Skipping.",
str(handler_function_name))
log.error(
"Handler function [%s] not found or returned an error. Skipping.",
str(handler_function_name),
)
# noinspection PyUnusedLocal
# pylint: disable=unused-argument
@ -109,9 +117,11 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
"""Method handles the rotary button press with the scsi list to open the action menu."""
context_object = self._menu_controller.get_active_menu().get_current_info_object()
self.context_stack.append(context_object)
self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU, context_object=context_object,
transition_attributes=self._menu_renderer_config.
transition_attributes_left)
self._menu_controller.segue(
CtrlBoardMenuBuilder.ACTION_MENU,
context_object=context_object,
transition_attributes=self._menu_renderer_config.transition_attributes_left,
)
# noinspection PyUnusedLocal
# pylint: disable=unused-argument
@ -119,9 +129,10 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
"""Method handles the rotary button press to return from the
action menu to the scsi list."""
self.context_stack.pop()
self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU,
transition_attributes=self._menu_renderer_config.
transition_attributes_right)
self._menu_controller.segue(
CtrlBoardMenuBuilder.SCSI_ID_MENU,
transition_attributes=self._menu_renderer_config.transition_attributes_right,
)
# noinspection PyUnusedLocal
# pylint: disable=unused-argument
@ -129,9 +140,11 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
"""Method handles the rotary button press on attach in the action menu."""
context_object = self._menu_controller.get_active_menu().context_object
self.context_stack.append(context_object)
self._menu_controller.segue(CtrlBoardMenuBuilder.IMAGES_MENU, context_object=context_object,
transition_attributes=self._menu_renderer_config.
transition_attributes_left)
self._menu_controller.segue(
CtrlBoardMenuBuilder.IMAGES_MENU,
context_object=context_object,
transition_attributes=self._menu_renderer_config.transition_attributes_left,
)
# noinspection PyUnusedLocal
def handle_action_menu_slot_detacheject(self, info_object):
@ -139,39 +152,43 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
context_object = self._menu_controller.get_active_menu().context_object
self.detach_eject_scsi_id()
self.context_stack = []
self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU,
context_object=context_object,
transition_attributes=self._menu_renderer_config.
transition_attributes_right)
self._menu_controller.segue(
CtrlBoardMenuBuilder.SCSI_ID_MENU,
context_object=context_object,
transition_attributes=self._menu_renderer_config.transition_attributes_right,
)
# noinspection PyUnusedLocal
def handle_action_menu_slot_info(self, info_object):
"""Method handles the rotary button press on 'Info' in the action menu."""
context_object = self._menu_controller.get_active_menu().context_object
self.context_stack.append(context_object)
self._menu_controller.segue(CtrlBoardMenuBuilder.DEVICEINFO_MENU,
transition_attributes=self._menu_renderer_config.
transition_attributes_left,
context_object=context_object)
self._menu_controller.segue(
CtrlBoardMenuBuilder.DEVICEINFO_MENU,
transition_attributes=self._menu_renderer_config.transition_attributes_left,
context_object=context_object,
)
# noinspection PyUnusedLocal
def handle_device_info_menu_return(self, info_object):
"""Method handles the rotary button press on 'Return' in the info menu."""
self.context_stack.pop()
context_object = self._menu_controller.get_active_menu().context_object
self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU,
transition_attributes=self._menu_renderer_config.
transition_attributes_right,
context_object=context_object)
self._menu_controller.segue(
CtrlBoardMenuBuilder.ACTION_MENU,
transition_attributes=self._menu_renderer_config.transition_attributes_right,
context_object=context_object,
)
# noinspection PyUnusedLocal
def handle_action_menu_loadprofile(self, info_object):
"""Method handles the rotary button press on 'Load Profile' in the action menu."""
context_object = self._menu_controller.get_active_menu().context_object
self.context_stack.append(context_object)
self._menu_controller.segue(CtrlBoardMenuBuilder.PROFILES_MENU,
transition_attributes=self._menu_renderer_config.
transition_attributes_left)
self._menu_controller.segue(
CtrlBoardMenuBuilder.PROFILES_MENU,
transition_attributes=self._menu_renderer_config.transition_attributes_left,
)
# noinspection PyUnusedLocal
def handle_profiles_menu_loadprofile(self, info_object):
@ -186,28 +203,31 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
self._menu_controller.show_message("Loading failed!")
self.context_stack = []
self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU,
transition_attributes=self._menu_renderer_config.
transition_attributes_right)
self._menu_controller.segue(
CtrlBoardMenuBuilder.SCSI_ID_MENU,
transition_attributes=self._menu_renderer_config.transition_attributes_right,
)
# noinspection PyUnusedLocal
def handle_action_menu_shutdown(self, info_object):
"""Method handles the rotary button press on 'Shutdown' in the action menu."""
self.ractl_cmd.shutdown("system")
self._menu_controller.show_message("Shutting down!", 150)
self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU,
transition_attributes=self._menu_renderer_config.
transition_attributes_right)
self._menu_controller.segue(
CtrlBoardMenuBuilder.SCSI_ID_MENU,
transition_attributes=self._menu_renderer_config.transition_attributes_right,
)
# noinspection PyUnusedLocal
def handle_images_menu_return(self, info_object):
"""Method handles the rotary button press on 'Return' in the image selection menu
(through attach/insert)."""
context_object = self.context_stack.pop()
self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU,
context_object=context_object,
transition_attributes=self._menu_renderer_config.
transition_attributes_right)
self._menu_controller.segue(
CtrlBoardMenuBuilder.ACTION_MENU,
context_object=context_object,
transition_attributes=self._menu_renderer_config.transition_attributes_right,
)
def handle_images_menu_image_attachinsert(self, info_object):
"""Method handles the rotary button press on an image in the image selection menu
@ -215,10 +235,11 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
context_object = self._menu_controller.get_active_menu().context_object
self.attach_insert_scsi_id(info_object)
self.context_stack = []
self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU,
context_object=context_object,
transition_attributes=self._menu_renderer_config.
transition_attributes_right)
self._menu_controller.segue(
CtrlBoardMenuBuilder.SCSI_ID_MENU,
context_object=context_object,
transition_attributes=self._menu_renderer_config.transition_attributes_right,
)
def attach_insert_scsi_id(self, info_object):
"""Helper method to attach/insert an image on a scsi id given through the menu context"""
@ -227,9 +248,9 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
context_object = self._menu_controller.get_active_menu().context_object
scsi_id = context_object["scsi_id"]
params = {"file": image_name}
result = self.ractl_cmd.attach_device(scsi_id=scsi_id,
device_type=device_type,
params=params)
result = self.ractl_cmd.attach_device(
scsi_id=scsi_id, device_type=device_type, params=params
)
if result["status"] is False:
self._menu_controller.show_message("Attach failed!")
@ -268,7 +289,10 @@ class CtrlBoardMenuUpdateEventHandler(Observer):
self._menu_controller.show_message("Detach failed!")
else:
log = logging.getLogger(__name__)
log.info("Device type '%s' currently unsupported for detach/eject!", str(device_type))
log.info(
"Device type '%s' currently unsupported for detach/eject!",
str(device_type),
)
def show_id_action_message(self, scsi_id, action: str):
"""Helper method for displaying an action message in the case of an exception."""

View File

@ -8,6 +8,7 @@ from ctrlboard_hw.encoder import Encoder
class CtrlBoardPrintEventHandler(observer.Observer):
"""Class implements a basic event handler that prints button presses from the RaSCSI
Control Board hardware."""
def update(self, updated_object):
if isinstance(updated_object, HardwareButton):
print(updated_object.name + " has been pressed!")

View File

@ -6,8 +6,13 @@ class RascsiShutdownCycler(Cycler):
"""Class implementing the shutdown cycler for the RaSCSI Control Board UI"""
def __init__(self, menu_controller, sock_cmd, ractl_cmd):
super().__init__(menu_controller, sock_cmd, ractl_cmd, return_entry=True,
empty_messages=False)
super().__init__(
menu_controller,
sock_cmd,
ractl_cmd,
return_entry=True,
empty_messages=False,
)
self.executed_once = False
def populate_cycle_entries(self):

View File

@ -16,6 +16,7 @@ from observable import Observable
# pylint: disable=too-many-instance-attributes
class CtrlBoardHardware(Observable):
"""Class implements the RaSCSI Control Board hardware and provides an interface to it."""
def __init__(self, display_i2c_address, pca9554_i2c_address, debounce_ms=200):
self.display_i2c_address = display_i2c_address
self.pca9554_i2c_address = pca9554_i2c_address
@ -33,63 +34,78 @@ class CtrlBoardHardware(Observable):
self.pca_driver = pca9554multiplexer.PCA9554Multiplexer(self.pca9554_i2c_address)
# setup pca9554
self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants.
PCA9554_PIN_ENC_A,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT)
self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants.
PCA9554_PIN_ENC_B,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT)
self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants.
PCA9554_PIN_BUTTON_1,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT)
self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants.
PCA9554_PIN_BUTTON_2,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT)
self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants.
PCA9554_PIN_BUTTON_ROTARY,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT)
self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants.
PCA9554_PIN_LED_1,
PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT)
self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants.
PCA9554_PIN_LED_2,
PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT)
self.pca_driver.write_configuration_register_port(
CtrlBoardHardwareConstants.PCA9554_PIN_ENC_A,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT,
)
self.pca_driver.write_configuration_register_port(
CtrlBoardHardwareConstants.PCA9554_PIN_ENC_B,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT,
)
self.pca_driver.write_configuration_register_port(
CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_1,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT,
)
self.pca_driver.write_configuration_register_port(
CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_2,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT,
)
self.pca_driver.write_configuration_register_port(
CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_ROTARY,
PCA9554Multiplexer.PIN_ENABLED_AS_INPUT,
)
self.pca_driver.write_configuration_register_port(
CtrlBoardHardwareConstants.PCA9554_PIN_LED_1,
PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT,
)
self.pca_driver.write_configuration_register_port(
CtrlBoardHardwareConstants.PCA9554_PIN_LED_2,
PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT,
)
self.input_register_buffer = 0
# pylint: disable=no-member
GPIO.setmode(GPIO.BCM)
GPIO.setup(CtrlBoardHardwareConstants.PI_PIN_INTERRUPT, GPIO.IN)
GPIO.add_event_detect(CtrlBoardHardwareConstants.PI_PIN_INTERRUPT, GPIO.FALLING,
callback=self.button_pressed_callback)
GPIO.add_event_detect(
CtrlBoardHardwareConstants.PI_PIN_INTERRUPT,
GPIO.FALLING,
callback=self.button_pressed_callback,
)
# configure button of the rotary encoder
self.rotary_button = HardwareButton(self.pca_driver,
CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_ROTARY)
self.rotary_button = HardwareButton(
self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_ROTARY
)
self.rotary_button.state = True
self.rotary_button.name = CtrlBoardHardwareConstants.ROTARY_BUTTON
# configure button 1
self.button1 = HardwareButton(self.pca_driver,
CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_1)
self.button1 = HardwareButton(
self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_1
)
self.button1.state = True
self.button1.name = CtrlBoardHardwareConstants.BUTTON_1
# configure button 2
self.button2 = HardwareButton(self.pca_driver,
CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_2)
self.button2 = HardwareButton(
self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_2
)
self.button2.state = True
self.button2.name = CtrlBoardHardwareConstants.BUTTON_2
# configure rotary encoder pin a
self.rotary_a = HardwareButton(self.pca_driver,
CtrlBoardHardwareConstants.PCA9554_PIN_ENC_A)
self.rotary_a = HardwareButton(
self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_ENC_A
)
self.rotary_a.state = True
self.rotary_a.directionalTransition = False
self.rotary_a.name = CtrlBoardHardwareConstants.ROTARY_A
# configure rotary encoder pin b
self.rotary_b = HardwareButton(self.pca_driver,
CtrlBoardHardwareConstants.PCA9554_PIN_ENC_B)
self.rotary_b = HardwareButton(
self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_ENC_B
)
self.rotary_b.state = True
self.rotary_b.directionalTransition = False
self.rotary_b.name = CtrlBoardHardwareConstants.ROTARY_B
@ -117,7 +133,7 @@ class CtrlBoardHardware(Observable):
# ignore button press if debounce time is not reached
if button.last_press is not None:
elapsed = (time.time_ns() - button.last_press)/1000000
elapsed = (time.time_ns() - button.last_press) / 1000000
if elapsed < self.debounce_ms:
return
@ -149,8 +165,8 @@ class CtrlBoardHardware(Observable):
@staticmethod
def button_value_shifted_list(input_register_buffer, bit):
"""Helper method for dealing with multiple buffered input registers"""
input_register_buffer_length = int(len(format(input_register_buffer, 'b'))/8)
shiftval = (input_register_buffer_length-1)*8
input_register_buffer_length = int(len(format(input_register_buffer, "b")) / 8)
shiftval = (input_register_buffer_length - 1) * 8
tmp = input_register_buffer >> shiftval
bitmask = 1 << bit
tmp &= bitmask
@ -162,12 +178,12 @@ class CtrlBoardHardware(Observable):
input_register_buffer = self.input_register_buffer
self.input_register_buffer = 0
input_register_buffer_length = int(len(format(input_register_buffer, 'b'))/8)
input_register_buffer_length = int(len(format(input_register_buffer, "b")) / 8)
if input_register_buffer_length < 1:
return
for i in range(0, input_register_buffer_length):
shiftval = (input_register_buffer_length-1-i)*8
shiftval = (input_register_buffer_length - 1 - i) * 8
input_register = (input_register_buffer >> shiftval) & 0b11111111
rot_a = self.button_value(input_register, 0)
@ -211,7 +227,7 @@ class CtrlBoardHardware(Observable):
if 2 < _address < 120:
try:
_bus.read_byte(_address)
address = '%02x' % _address
address = "%02x" % _address
detected_i2c_addresses.append(int(address, base=16))
except IOError: # simply skip unsuccessful i2c probes
pass
@ -223,8 +239,12 @@ class CtrlBoardHardware(Observable):
the expected i2c addresses are detected."""
# pylint: disable=c-extension-no-member
i2c_addresses = self.detect_i2c_devices(smbus.SMBus(1))
return bool((int(self.display_i2c_address) in i2c_addresses and
(int(self.pca9554_i2c_address) in i2c_addresses)))
return bool(
(
int(self.display_i2c_address) in i2c_addresses
and (int(self.pca9554_i2c_address) in i2c_addresses)
)
)
def detect_display(self):
"""Detects whether an i2c display is connected to the RaSCSI hat."""

View File

@ -4,8 +4,9 @@
# pylint: disable=too-few-public-methods
class CtrlBoardHardwareConstants:
"""Class containing the RaSCSI Control Board hardware constants"""
DISPLAY_I2C_ADDRESS = 0x3c
PCA9554_I2C_ADDRESS = 0x3f
DISPLAY_I2C_ADDRESS = 0x3C
PCA9554_I2C_ADDRESS = 0x3F
PCA9554_PIN_ENC_A = 0
PCA9554_PIN_ENC_B = 1
PCA9554_PIN_BUTTON_1 = 2
@ -14,7 +15,7 @@ class CtrlBoardHardwareConstants:
PCA9554_PIN_LED_1 = 6
PCA9554_PIN_LED_2 = 7
PI_PIN_INTERRUPT = 9 # BCM
PI_PIN_INTERRUPT = 9 # BCM
BUTTON_1 = "Bt1"
BUTTON_2 = "Bt2"

View File

@ -6,6 +6,7 @@ from ctrlboard_hw.hardware_button import HardwareButton
class Encoder:
"""Class implementing a detection mechanism to detect the rotary encoder directions
through the i2c multiplexer + interrupt"""
def __init__(self, enc_a: HardwareButton, enc_b: HardwareButton):
self.enc_a = enc_a
self.enc_b = enc_b
@ -27,16 +28,27 @@ class Encoder:
state |= 0b10000000
# clockwise pattern detection
if (state == 0b11010010 or state == 0b11001000 or state == 0b11011000 or
state == 0b11010001 or state == 0b11011011 or state == 0b11100000 or
state == 0b11001011):
if (
state == 0b11010010
or state == 0b11001000
or state == 0b11011000
or state == 0b11010001
or state == 0b11011011
or state == 0b11100000
or state == 0b11001011
):
self.pos += 1
self.direction = 1
self.state = 0b00000000
return
# counter-clockwise pattern detection
elif (state == 0b11000100 or state == 0b11100100 or state == 0b11100001 or
state == 0b11000111 or state == 0b11100111):
elif (
state == 0b11000100
or state == 0b11100100
or state == 0b11100001
or state == 0b11000111
or state == 0b11100111
):
self.pos -= 1
self.direction = -1
self.state = 0b00000000

View File

@ -19,8 +19,10 @@ class PCA9554Multiplexer:
try:
self.i2c_bus = smbus.SMBus(1)
if self.read_input_register() is None:
logging.error("PCA9554 initialization test on specified i2c address %s failed",
self.i2c_address)
logging.error(
"PCA9554 initialization test on specified i2c address %s failed",
self.i2c_address,
)
self.i2c_bus = None
except IOError:
logging.error("Could not open the i2c bus.")
@ -35,8 +37,9 @@ class PCA9554Multiplexer:
if bit_value:
updated_configuration_register = configuration_register | (1 << port_bit)
else:
updated_configuration_register = configuration_register & (0xFF -
(1 << port_bit))
updated_configuration_register = configuration_register & (
0xFF - (1 << port_bit)
)
self.i2c_bus.write_byte_data(self.i2c_address, 3, updated_configuration_register)
return True

View File

@ -9,6 +9,7 @@ from rascsi.ractl_cmds import RaCtlCmds
class CtrlBoardMenuBuilder(MenuBuilder):
"""Class fgor building the control board UI specific menus"""
SCSI_ID_MENU = "scsi_id_menu"
ACTION_MENU = "action_menu"
IMAGES_MENU = "images_menu"
@ -27,8 +28,12 @@ class CtrlBoardMenuBuilder(MenuBuilder):
def __init__(self, ractl_cmd: RaCtlCmds):
super().__init__()
self._rascsi_client = ractl_cmd
self.file_cmd = FileCmds(sock_cmd=ractl_cmd.sock_cmd, ractl=ractl_cmd,
token=ractl_cmd.token, locale=ractl_cmd.locale)
self.file_cmd = FileCmds(
sock_cmd=ractl_cmd.sock_cmd,
ractl=ractl_cmd,
token=ractl_cmd.token,
locale=ractl_cmd.locale,
)
def build(self, name: str, context_object=None) -> Menu:
if name == CtrlBoardMenuBuilder.SCSI_ID_MENU:
@ -93,9 +98,14 @@ class CtrlBoardMenuBuilder(MenuBuilder):
if device_type != "":
menu_str += " [" + device_type + "]"
menu.add_entry(menu_str, {"context": self.SCSI_ID_MENU,
"action": self.ACTION_OPENACTIONMENU,
"scsi_id": scsi_id})
menu.add_entry(
menu_str,
{
"context": self.SCSI_ID_MENU,
"action": self.ACTION_OPENACTIONMENU,
"scsi_id": scsi_id,
},
)
return menu
@ -103,18 +113,30 @@ class CtrlBoardMenuBuilder(MenuBuilder):
def create_action_menu(self, context_object=None):
"""Method creates the action submenu with action that can be performed on a scsi slot"""
menu = Menu(CtrlBoardMenuBuilder.ACTION_MENU)
menu.add_entry("Return", {"context": self.ACTION_MENU,
"action": self.ACTION_RETURN})
menu.add_entry("Attach/Insert", {"context": self.ACTION_MENU,
"action": self.ACTION_SLOT_ATTACHINSERT})
menu.add_entry("Detach/Eject", {"context": self.ACTION_MENU,
"action": self.ACTION_SLOT_DETACHEJECT})
menu.add_entry("Info", {"context": self.ACTION_MENU,
"action": self.ACTION_SLOT_INFO})
menu.add_entry("Load Profile", {"context": self.ACTION_MENU,
"action": self.ACTION_LOADPROFILE})
menu.add_entry("Shutdown", {"context": self.ACTION_MENU,
"action": self.ACTION_SHUTDOWN})
menu.add_entry(
"Return",
{"context": self.ACTION_MENU, "action": self.ACTION_RETURN},
)
menu.add_entry(
"Attach/Insert",
{"context": self.ACTION_MENU, "action": self.ACTION_SLOT_ATTACHINSERT},
)
menu.add_entry(
"Detach/Eject",
{"context": self.ACTION_MENU, "action": self.ACTION_SLOT_DETACHEJECT},
)
menu.add_entry(
"Info",
{"context": self.ACTION_MENU, "action": self.ACTION_SLOT_INFO},
)
menu.add_entry(
"Load Profile",
{"context": self.ACTION_MENU, "action": self.ACTION_LOADPROFILE},
)
menu.add_entry(
"Shutdown",
{"context": self.ACTION_MENU, "action": self.ACTION_SHUTDOWN},
)
return menu
def create_images_menu(self, context_object=None):
@ -123,12 +145,15 @@ class CtrlBoardMenuBuilder(MenuBuilder):
images_info = self.file_cmd.list_images()
menu.add_entry("Return", {"context": self.IMAGES_MENU, "action": self.ACTION_RETURN})
images = images_info["files"]
sorted_images = sorted(images, key=lambda d: d['name'])
sorted_images = sorted(images, key=lambda d: d["name"])
for image in sorted_images:
image_str = image["name"] + " [" + image["detected_type"] + "]"
image_context = {"context": self.IMAGES_MENU, "name": str(image["name"]),
"device_type": str(image["detected_type"]),
"action": self.ACTION_IMAGE_ATTACHINSERT}
image_context = {
"context": self.IMAGES_MENU,
"name": str(image["name"]),
"device_type": str(image["detected_type"]),
"action": self.ACTION_IMAGE_ATTACHINSERT,
}
menu.add_entry(image_str, image_context)
return menu
@ -138,9 +163,14 @@ class CtrlBoardMenuBuilder(MenuBuilder):
menu.add_entry("Return", {"context": self.IMAGES_MENU, "action": self.ACTION_RETURN})
config_files = self.file_cmd.list_config_files()
for config_file in config_files:
menu.add_entry(str(config_file),
{"context": self.PROFILES_MENU, "name": str(config_file),
"action": self.ACTION_LOADPROFILE})
menu.add_entry(
str(config_file),
{
"context": self.PROFILES_MENU,
"name": str(config_file),
"action": self.ACTION_LOADPROFILE,
},
)
return menu

View File

@ -6,14 +6,17 @@ import logging
from config import CtrlboardConfig
from ctrlboard_hw.ctrlboard_hw import CtrlBoardHardware
from ctrlboard_hw.ctrlboard_hw_constants import CtrlBoardHardwareConstants
from ctrlboard_event_handler.ctrlboard_menu_update_event_handler \
import CtrlBoardMenuUpdateEventHandler
from ctrlboard_event_handler.ctrlboard_menu_update_event_handler import (
CtrlBoardMenuUpdateEventHandler,
)
from ctrlboard_menu_builder import CtrlBoardMenuBuilder
from menu.menu_renderer_config import MenuRendererConfig
from menu.menu_renderer_luma_oled import MenuRendererLumaOled
from rascsi.exceptions import (EmptySocketChunkException,
InvalidProtobufResponse,
FailedSocketConnectionException)
from rascsi.exceptions import (
EmptySocketChunkException,
InvalidProtobufResponse,
FailedSocketConnectionException,
)
from rascsi.ractl_cmds import RaCtlCmds
from rascsi.socket_cmds import SocketCmds
@ -23,7 +26,7 @@ from rascsi_menu_controller import RascsiMenuController
def parse_config():
"""Parses the command line parameters and configured the RaSCSI Control Board UI accordingly"""
config = CtrlboardConfig()
cmdline_args_parser = argparse.ArgumentParser(description='RaSCSI ctrlboard service')
cmdline_args_parser = argparse.ArgumentParser(description="RaSCSI ctrlboard service")
cmdline_args_parser.add_argument(
"--rotation",
type=int,
@ -68,7 +71,7 @@ def parse_config():
default=logging.WARN,
action="store",
help="Loglevel. Valid values: 0 (notset), 10 (debug), 30 (warning), "
"40 (error), 50 (critical). Default: Warning",
"40 (error), 50 (critical). Default: Warning",
)
cmdline_args_parser.add_argument(
"--transitions",
@ -115,14 +118,14 @@ def main():
config = parse_config()
log_format = "%(asctime)s:%(name)s:%(levelname)s - %(message)s"
logging.basicConfig(stream=sys.stdout,
format=log_format,
level=config.LOG_LEVEL)
logging.basicConfig(stream=sys.stdout, format=log_format, level=config.LOG_LEVEL)
log = logging.getLogger(__name__)
log.debug("RaSCSI ctrlboard service started.")
ctrlboard_hw = CtrlBoardHardware(display_i2c_address=config.DISPLAY_I2C_ADDRESS,
pca9554_i2c_address=config.PCA9554_I2C_ADDRESS)
ctrlboard_hw = CtrlBoardHardware(
display_i2c_address=config.DISPLAY_I2C_ADDRESS,
pca9554_i2c_address=config.PCA9554_I2C_ADDRESS,
)
# for now, we require the complete rascsi ctrlboard hardware.
# Oled only will be supported as well at some later point in time.
@ -143,8 +146,10 @@ def main():
exit(1)
if check_rascsi_connection(ractl_cmd) is False:
log.error("Communication with RaSCSI failed. Please check if password token must be set "
"and whether is set correctly.")
log.error(
"Communication with RaSCSI failed. Please check if password token must be set "
"and whether is set correctly."
)
exit(1)
menu_renderer_config = MenuRendererConfig()
@ -156,18 +161,21 @@ def main():
menu_renderer_config.rotation = config.ROTATION
menu_builder = CtrlBoardMenuBuilder(ractl_cmd)
menu_controller = RascsiMenuController(config.MENU_REFRESH_INTERVAL, menu_builder=menu_builder,
menu_renderer=MenuRendererLumaOled(menu_renderer_config),
menu_renderer_config=menu_renderer_config)
menu_controller = RascsiMenuController(
config.MENU_REFRESH_INTERVAL,
menu_builder=menu_builder,
menu_renderer=MenuRendererLumaOled(menu_renderer_config),
menu_renderer_config=menu_renderer_config,
)
menu_controller.add(CtrlBoardMenuBuilder.SCSI_ID_MENU)
menu_controller.add(CtrlBoardMenuBuilder.ACTION_MENU)
menu_controller.show_splash_screen(f"resources/splash_start_64.bmp")
menu_controller.show_splash_screen("resources/splash_start_64.bmp")
menu_update_event_handler = CtrlBoardMenuUpdateEventHandler(menu_controller,
sock_cmd=sock_cmd,
ractl_cmd=ractl_cmd)
menu_update_event_handler = CtrlBoardMenuUpdateEventHandler(
menu_controller, sock_cmd=sock_cmd, ractl_cmd=ractl_cmd
)
ctrlboard_hw.attach(menu_update_event_handler)
menu_controller.set_active_menu(CtrlBoardMenuBuilder.SCSI_ID_MENU)
@ -184,5 +192,5 @@ def main():
print(ex)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@ -5,6 +5,7 @@ from menu.screensaver import ScreenSaver
class BlankScreenSaver(ScreenSaver):
"""Class implementing a blank screen safer that simply blanks the screen after a
configured activation delay"""
def __init__(self, activation_delay, menu_renderer):
super().__init__(activation_delay, menu_renderer)
self._initial_draw_call = None

View File

@ -8,9 +8,17 @@ class Cycler:
"""Class implementing button cycling functionality. Message is shown at the center of
the screen where repeated button presses cycle through the available selection
possibilities. Inactivity (cycle_timeout) actives cycle entry last shown on the screen."""
def __init__(self, menu_controller, sock_cmd, ractl_cmd,
cycle_timeout=3, return_string="Return ->",
return_entry=True, empty_messages=True):
def __init__(
self,
menu_controller,
sock_cmd,
ractl_cmd,
cycle_timeout=3,
return_string="Return ->",
return_entry=True,
empty_messages=True,
):
self._cycle_profile_timer_flag = Timer(activation_delay=cycle_timeout)
self._menu_controller = menu_controller
self.sock_cmd = sock_cmd
@ -39,7 +47,7 @@ class Cycler:
"""Perform the return action, i.e., when no selection is chosen"""
def update(self):
""" Returns True if object has completed its task and can be deleted """
"""Returns True if object has completed its task and can be deleted"""
if self._cycle_profile_timer_flag is None:
return None

View File

@ -4,6 +4,7 @@ from typing import List
class Menu:
"""Class implement the Menu class"""
def __init__(self, name: str):
self.entries: List = []
self.item_selection = 0
@ -17,11 +18,11 @@ class Menu:
def get_current_text(self):
"""Returns the text content of the currently selected text in the menu."""
return self.entries[self.item_selection]['text']
return self.entries[self.item_selection]["text"]
def get_current_info_object(self):
"""Returns the data object to the currently selected menu item"""
return self.entries[self.item_selection]['data_object']
return self.entries[self.item_selection]["data_object"]
def __repr__(self):
print("entries: " + str(self.entries))

View File

@ -6,6 +6,7 @@ from menu.menu import Menu
# pylint: disable=too-few-public-methods
class MenuBuilder(ABC):
"""Base class for menu builders"""
def __init__(self):
pass

View File

@ -18,6 +18,7 @@ from menu.screensaver import ScreenSaver
class MenuRenderer(ABC):
"""The abstract menu renderer class provides the base for concrete menu
renderer classes that implement functionality based on conrete hardware or available APIs."""
def __init__(self, config: MenuRendererConfig):
self.message = ""
self.mini_message = ""
@ -25,7 +26,7 @@ class MenuRenderer(ABC):
self._config = config
self.disp = self.display_init()
self.image = Image.new('1', (self.disp.width, self.disp.height))
self.image = Image.new("1", (self.disp.width, self.disp.height))
self.draw = ImageDraw.Draw(self.image)
self.font = ImageFont.truetype(config.font_path, size=config.font_size)
# just a sample text to work with the font height
@ -83,14 +84,21 @@ class MenuRenderer(ABC):
def draw_row(self, row_number: int, text: str, selected: bool):
"""Draws a single row of the menu."""
x_pos = 0
y_pos = row_number*self.font_height
y_pos = row_number * self.font_height
if selected:
selection_extension = 0
if row_number < self.rows_per_screen():
selection_extension = self._config.row_selection_pixel_extension
self.draw.rectangle((x_pos, y_pos, self.disp.width,
y_pos+self._config.font_size+selection_extension),
outline=0, fill=255)
self.draw.rectangle(
(
x_pos,
y_pos,
self.disp.width,
y_pos + self._config.font_size + selection_extension,
),
outline=0,
fill=255,
)
# in stage 1, we initialize scrolling for the currently selected line
if self._perform_scrolling_stage == 1:
@ -99,9 +107,9 @@ class MenuRenderer(ABC):
# in stage 2, we know the details and can thus perform the scrolling to the left
if self._perform_scrolling_stage == 2:
if self._current_line_horizontal_overlap+self._x_scrolling > 0:
if self._current_line_horizontal_overlap + self._x_scrolling > 0:
self._x_scrolling -= 1
if self._current_line_horizontal_overlap+self._x_scrolling == 0:
if self._current_line_horizontal_overlap + self._x_scrolling == 0:
self._stage_timestamp = int(time.time())
self._perform_scrolling_stage = 3
@ -115,11 +123,12 @@ class MenuRenderer(ABC):
# in stage 4, we scroll back to the right
if self._perform_scrolling_stage == 4:
if self._current_line_horizontal_overlap+self._x_scrolling >= 0:
if self._current_line_horizontal_overlap + self._x_scrolling >= 0:
self._x_scrolling += 1
if (self._current_line_horizontal_overlap +
self._x_scrolling) == self._current_line_horizontal_overlap:
if (
self._current_line_horizontal_overlap + self._x_scrolling
) == self._current_line_horizontal_overlap:
self._stage_timestamp = int(time.time())
self._perform_scrolling_stage = 5
@ -131,8 +140,14 @@ class MenuRenderer(ABC):
self._stage_timestamp = None
self._perform_scrolling_stage = 2
self.draw.text((x_pos+self._x_scrolling, y_pos), text, font=self.font,
spacing=0, stroke_fill=0, fill=0)
self.draw.text(
(x_pos + self._x_scrolling, y_pos),
text,
font=self.font,
spacing=0,
stroke_fill=0,
fill=0,
)
else:
self.draw.text((x_pos, y_pos), text, font=self.font, spacing=0, stroke_fill=0, fill=255)
@ -143,8 +158,15 @@ class MenuRenderer(ABC):
centered_height = (self.disp.height - font_height) / 2
self.draw.rectangle((0, 0, self.disp.width, self.disp.height), outline=0, fill=255)
self.draw.text((centered_width, centered_height), text, align="center", font=self.font,
stroke_fill=0, fill=0, textsize=20)
self.draw.text(
(centered_width, centered_height),
text,
align="center",
font=self.font,
stroke_fill=0,
fill=0,
textsize=20,
)
def draw_mini_message(self, text: str):
"""Draws a fullscreen message, i.e., a message covering only the center portion of
@ -153,16 +175,33 @@ class MenuRenderer(ABC):
centered_width = (self.disp.width - font_width) / 2
centered_height = (self.disp.height - self.font_height) / 2
self.draw.rectangle((0, centered_height-4, self.disp.width,
centered_height+self.font_height+4), outline=0, fill=255)
self.draw.text((centered_width, centered_height), text, align="center", font=self.font,
stroke_fill=0, fill=0, textsize=20)
self.draw.rectangle(
(
0,
centered_height - 4,
self.disp.width,
centered_height + self.font_height + 4,
),
outline=0,
fill=255,
)
self.draw.text(
(centered_width, centered_height),
text,
align="center",
font=self.font,
stroke_fill=0,
fill=0,
textsize=20,
)
def draw_menu(self):
"""Method draws the menu set to the class instance."""
if self._menu.item_selection >= self.frame_start_row + self.rows_per_screen():
if self._config.scroll_behavior == "page":
self.frame_start_row = self.frame_start_row + (round(self.rows_per_screen()/2)) + 1
self.frame_start_row = (
self.frame_start_row + (round(self.rows_per_screen() / 2)) + 1
)
if self.frame_start_row > len(self._menu.entries) - self.rows_per_screen():
self.frame_start_row = len(self._menu.entries) - self.rows_per_screen()
else: # extend as default behavior
@ -170,13 +209,15 @@ class MenuRenderer(ABC):
if self._menu.item_selection < self.frame_start_row:
if self._config.scroll_behavior == "page":
self.frame_start_row = self.frame_start_row - (round(self.rows_per_screen()/2)) - 1
self.frame_start_row = (
self.frame_start_row - (round(self.rows_per_screen() / 2)) - 1
)
if self.frame_start_row < 0:
self.frame_start_row = 0
else: # extend as default behavior
self.frame_start_row = self._menu.item_selection
self.draw_menu_frame(self.frame_start_row, self.frame_start_row+self.rows_per_screen())
self.draw_menu_frame(self.frame_start_row, self.frame_start_row + self.rows_per_screen())
def draw_menu_frame(self, frame_start_row: int, frame_end_row: int):
"""Draws row frame_start_row to frame_end_row of the class instance menu, i.e., it

View File

@ -11,8 +11,9 @@ class MenuRendererAdafruitSSD1306(MenuRenderer):
def display_init(self):
i2c = busio.I2C(SCL, SDA)
self.disp = adafruit_ssd1306.SSD1306_I2C(self._config.width, self._config.height, i2c,
addr=self._config.i2c_address)
self.disp = adafruit_ssd1306.SSD1306_I2C(
self._config.width, self._config.height, i2c, addr=self._config.i2c_address
)
self.disp.rotation = self._config.get_mapped_rotation()
self.disp.fill(0)
self.disp.show()

View File

@ -5,20 +5,16 @@
class MenuRendererConfig:
"""Class for configuring menu renderer instances. Provides configuration options
such as width, height, i2c address, font, transitions, etc."""
_rotation_mapper = {
0: 0,
90: 1,
180: 2,
270: 3
}
_rotation_mapper = {0: 0, 90: 1, 180: 2, 270: 3}
def __init__(self):
self.width = 128
self.height = 64
self.i2c_address = 0x3c
self.i2c_address = 0x3C
self.i2c_port = 1
self.display_type = "ssd1306" # luma-oled supported devices, "sh1106", "ssd1306", ...
self.font_path = 'resources/DejaVuSansMono-Bold.ttf'
self.font_path = "resources/DejaVuSansMono-Bold.ttf"
self.font_size = 12
self.row_selection_pixel_extension = 2
self.scroll_behavior = "page" # "extend" or "page"

View File

@ -5,14 +5,19 @@ from menu.menu_renderer import MenuRenderer
class MenuRendererLumaOled(MenuRenderer):
"""Class implementing the luma oled menu renderer"""
def display_init(self):
serial = i2c(port=self._config.i2c_port, address=self._config.i2c_address)
import luma.oled.device
device = getattr(luma.oled.device, self._config.display_type)
self.disp = device(serial_interface=serial, width=self._config.width,
height=self._config.height,
rotate=self._config.get_mapped_rotation())
self.disp = device(
serial_interface=serial,
width=self._config.width,
height=self._config.height,
rotate=self._config.get_mapped_rotation(),
)
self.disp.clear()
self.disp.show()

View File

@ -5,6 +5,7 @@ import time
class Timer:
"""Class implementing a timer class. Takes an activation delay and
sets a flag if the activation delay exprires."""
def __init__(self, activation_delay):
self.start_timestamp = int(time.time())
self.activation_delay = activation_delay

View File

@ -17,6 +17,7 @@ class Transition:
class PushTransition(Transition):
"""Class implementing a push left/right transition."""
PUSH_LEFT_TRANSITION = "push_left"
PUSH_RIGHT_TRANSITION = "push_right"
@ -32,7 +33,7 @@ class PushTransition(Transition):
if transition_attributes is not None and transition_attributes != {}:
direction = transition_attributes["direction"]
transition_image = Image.new('1', (self.disp.width, self.disp.height))
transition_image = Image.new("1", (self.disp.width, self.disp.height))
if direction == PushTransition.PUSH_LEFT_TRANSITION:
self.perform_push_left(end_image, start_image, transition_image)
@ -57,8 +58,8 @@ class PushTransition(Transition):
"""Implements a push right transition. Is called by perform depending on the transition
attribute 'direction'."""
for x_pos in range(0, 128, self.transition_attributes["transition_speed"]):
left_region = start_image.crop((0, 0, 128-x_pos, 64))
right_region = end_image.crop((128-x_pos, 0, 128, 64))
left_region = start_image.crop((0, 0, 128 - x_pos, 64))
right_region = end_image.crop((128 - x_pos, 0, 128, 64))
transition_image.paste(left_region, (x_pos, 0, 128, 64))
transition_image.paste(right_region, (0, 0, x_pos, 64))
self.disp.display(transition_image)

View File

@ -5,6 +5,7 @@ from observer import Observer
class Observable:
"""Class implementing the Observable pattern"""
_observers: List[Observer] = []
def attach(self, observer: Observer):

View File

@ -5,6 +5,7 @@ from abc import ABC, abstractmethod
# pylint: disable=too-few-public-methods
class Observer(ABC):
"""Class implementing an abserver"""
@abstractmethod
def update(self, updated_object) -> None:
"""Abstract method for updating an observer. Needs to be implemented by subclasses."""

View File

@ -9,8 +9,13 @@ from menu.timer import Timer
class RascsiMenuController(MenuController):
"""Class implementing a RaSCSI Control Board UI specific menu controller"""
def __init__(self, refresh_interval, menu_builder: MenuBuilder,
menu_renderer=None, menu_renderer_config=None):
def __init__(
self,
refresh_interval,
menu_builder: MenuBuilder,
menu_renderer=None,
menu_renderer_config=None,
):
super().__init__(menu_builder, menu_renderer, menu_renderer_config)
self._refresh_interval = refresh_interval
self._menu_renderer: MenuRenderer = menu_renderer

View File

@ -66,91 +66,175 @@ rascsi_none = -1
# Matrix showing all of the SCSI signals, along what signal they're looped back to.
# dir_ctrl indicates which direction control pin is associated with that output
gpio_map = [
{ 'gpio_num': scsi_d0_gpio, 'attached_to': scsi_ack_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_d1_gpio, 'attached_to': scsi_sel_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_d2_gpio, 'attached_to': scsi_atn_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_d3_gpio, 'attached_to': scsi_rst_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_d4_gpio, 'attached_to': scsi_cd_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_d5_gpio, 'attached_to': scsi_io_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_d6_gpio, 'attached_to': scsi_msg_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_d7_gpio, 'attached_to': scsi_req_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_dp_gpio, 'attached_to': scsi_bsy_gpio, 'dir_ctrl': rascsi_dtd_gpio},
{ 'gpio_num': scsi_atn_gpio, 'attached_to': scsi_d2_gpio, 'dir_ctrl': rascsi_ind_gpio},
{ 'gpio_num': scsi_rst_gpio, 'attached_to': scsi_d3_gpio, 'dir_ctrl': rascsi_ind_gpio},
{ 'gpio_num': scsi_ack_gpio, 'attached_to': scsi_d0_gpio, 'dir_ctrl': rascsi_ind_gpio},
{ 'gpio_num': scsi_req_gpio, 'attached_to': scsi_d7_gpio, 'dir_ctrl': rascsi_tad_gpio},
{ 'gpio_num': scsi_msg_gpio, 'attached_to': scsi_d6_gpio, 'dir_ctrl': rascsi_tad_gpio},
{ 'gpio_num': scsi_cd_gpio, 'attached_to': scsi_d4_gpio, 'dir_ctrl': rascsi_tad_gpio},
{ 'gpio_num': scsi_io_gpio, 'attached_to': scsi_d5_gpio, 'dir_ctrl': rascsi_tad_gpio},
{ 'gpio_num': scsi_bsy_gpio, 'attached_to': scsi_dp_gpio, 'dir_ctrl': rascsi_tad_gpio},
{ 'gpio_num': scsi_sel_gpio, 'attached_to': scsi_d1_gpio, 'dir_ctrl': rascsi_ind_gpio},
{
"gpio_num": scsi_d0_gpio,
"attached_to": scsi_ack_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_d1_gpio,
"attached_to": scsi_sel_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_d2_gpio,
"attached_to": scsi_atn_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_d3_gpio,
"attached_to": scsi_rst_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_d4_gpio,
"attached_to": scsi_cd_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_d5_gpio,
"attached_to": scsi_io_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_d6_gpio,
"attached_to": scsi_msg_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_d7_gpio,
"attached_to": scsi_req_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_dp_gpio,
"attached_to": scsi_bsy_gpio,
"dir_ctrl": rascsi_dtd_gpio,
},
{
"gpio_num": scsi_atn_gpio,
"attached_to": scsi_d2_gpio,
"dir_ctrl": rascsi_ind_gpio,
},
{
"gpio_num": scsi_rst_gpio,
"attached_to": scsi_d3_gpio,
"dir_ctrl": rascsi_ind_gpio,
},
{
"gpio_num": scsi_ack_gpio,
"attached_to": scsi_d0_gpio,
"dir_ctrl": rascsi_ind_gpio,
},
{
"gpio_num": scsi_req_gpio,
"attached_to": scsi_d7_gpio,
"dir_ctrl": rascsi_tad_gpio,
},
{
"gpio_num": scsi_msg_gpio,
"attached_to": scsi_d6_gpio,
"dir_ctrl": rascsi_tad_gpio,
},
{
"gpio_num": scsi_cd_gpio,
"attached_to": scsi_d4_gpio,
"dir_ctrl": rascsi_tad_gpio,
},
{
"gpio_num": scsi_io_gpio,
"attached_to": scsi_d5_gpio,
"dir_ctrl": rascsi_tad_gpio,
},
{
"gpio_num": scsi_bsy_gpio,
"attached_to": scsi_dp_gpio,
"dir_ctrl": rascsi_tad_gpio,
},
{
"gpio_num": scsi_sel_gpio,
"attached_to": scsi_d1_gpio,
"dir_ctrl": rascsi_ind_gpio,
},
]
# List of all of the SCSI signals that is also a dictionary to their human readable name
scsi_signals = {
scsi_d0_gpio: 'D0',
scsi_d1_gpio: 'D1',
scsi_d2_gpio: 'D2',
scsi_d3_gpio: 'D3',
scsi_d4_gpio: 'D4',
scsi_d5_gpio: 'D5',
scsi_d6_gpio: 'D6',
scsi_d7_gpio: 'D7',
scsi_dp_gpio: 'DP',
scsi_atn_gpio: 'ATN',
scsi_rst_gpio: 'RST',
scsi_ack_gpio: 'ACK',
scsi_req_gpio: 'REQ',
scsi_msg_gpio: 'MSG',
scsi_cd_gpio: 'CD',
scsi_io_gpio: 'IO',
scsi_bsy_gpio: 'BSY',
scsi_sel_gpio: 'SEL'
scsi_d0_gpio: "D0",
scsi_d1_gpio: "D1",
scsi_d2_gpio: "D2",
scsi_d3_gpio: "D3",
scsi_d4_gpio: "D4",
scsi_d5_gpio: "D5",
scsi_d6_gpio: "D6",
scsi_d7_gpio: "D7",
scsi_dp_gpio: "DP",
scsi_atn_gpio: "ATN",
scsi_rst_gpio: "RST",
scsi_ack_gpio: "ACK",
scsi_req_gpio: "REQ",
scsi_msg_gpio: "MSG",
scsi_cd_gpio: "CD",
scsi_io_gpio: "IO",
scsi_bsy_gpio: "BSY",
scsi_sel_gpio: "SEL",
}
# Debug function that just dumps the status of all of the scsi signals to the console
def print_all():
for cur_gpio in gpio_map:
print(cur_gpio['name']+"="+str(gpio.input(cur_gpio['gpio_num'])) + " ", end='', flush=True)
print(
cur_gpio["name"] + "=" + str(gpio.input(cur_gpio["gpio_num"])) + " ",
end="",
flush=True,
)
print("")
# Set transceivers IC1 and IC2 to OUTPUT
def set_dtd_out():
gpio.output(rascsi_dtd_gpio,gpio.LOW)
gpio.output(rascsi_dtd_gpio, gpio.LOW)
# Set transceivers IC1 and IC2 to INPUT
def set_dtd_in():
gpio.output(rascsi_dtd_gpio,gpio.HIGH)
gpio.output(rascsi_dtd_gpio, gpio.HIGH)
# Set transceiver IC4 to OUTPUT
def set_ind_out():
gpio.output(rascsi_ind_gpio,gpio.HIGH)
gpio.output(rascsi_ind_gpio, gpio.HIGH)
# Set transceiver IC4 to INPUT
def set_ind_in():
gpio.output(rascsi_ind_gpio,gpio.LOW)
gpio.output(rascsi_ind_gpio, gpio.LOW)
# Set transceiver IC3 to OUTPUT
def set_tad_out():
gpio.output(rascsi_tad_gpio,gpio.HIGH)
gpio.output(rascsi_tad_gpio, gpio.HIGH)
# Set transceiver IC3 to INPUT
def set_tad_in():
gpio.output(rascsi_tad_gpio,gpio.LOW)
gpio.output(rascsi_tad_gpio, gpio.LOW)
# Set the specified transciever to an OUTPUT. All of the other transceivers
# will be set to inputs. If a non-existent direction gpio is specified, this
# will set all of the transceivers to inputs.
def set_output_channel(out_gpio):
if(out_gpio == rascsi_tad_gpio):
if out_gpio == rascsi_tad_gpio:
set_tad_out()
else:
set_tad_in()
if(out_gpio == rascsi_dtd_gpio):
if out_gpio == rascsi_dtd_gpio:
set_dtd_out()
else:
set_dtd_in()
if(out_gpio == rascsi_ind_gpio):
if out_gpio == rascsi_ind_gpio:
set_ind_out()
else:
set_ind_in()
@ -161,11 +245,11 @@ def set_output_channel(out_gpio):
def test_gpio_pin(gpio_rec):
global err_count
set_output_channel(gpio_rec['dir_ctrl'])
set_output_channel(gpio_rec["dir_ctrl"])
############################################
# set the test gpio low
gpio.output(gpio_rec['gpio_num'], gpio.LOW)
gpio.output(gpio_rec["gpio_num"], gpio.LOW)
time.sleep(pin_settle_delay)
@ -173,18 +257,34 @@ def test_gpio_pin(gpio_rec):
for cur_gpio in scsi_signals:
# all of the gpios should be high except for the test gpio and the connected gpio
cur_val = gpio.input(cur_gpio)
if( cur_gpio == gpio_rec['gpio_num']):
if(cur_val != gpio.LOW):
print("Error: Test commanded GPIO " + scsi_signals[gpio_rec['gpio_num']] + " to be low, but it did not respond")
err_count = err_count+1
elif (cur_gpio == gpio_rec['attached_to']):
if(cur_val != gpio.LOW):
print("Error: GPIO " + scsi_signals[gpio_rec['gpio_num']] + " should drive " + scsi_signals[gpio_rec['attached_to']] + " low, but did not")
err_count = err_count+1
if cur_gpio == gpio_rec["gpio_num"]:
if cur_val != gpio.LOW:
print(
"Error: Test commanded GPIO "
+ scsi_signals[gpio_rec["gpio_num"]]
+ " to be low, but it did not respond"
)
err_count = err_count + 1
elif cur_gpio == gpio_rec["attached_to"]:
if cur_val != gpio.LOW:
print(
"Error: GPIO "
+ scsi_signals[gpio_rec["gpio_num"]]
+ " should drive "
+ scsi_signals[gpio_rec["attached_to"]]
+ " low, but did not"
)
err_count = err_count + 1
else:
if(cur_val != gpio.HIGH):
print("Error: GPIO " + scsi_signals[gpio_rec['gpio_num']] + " incorrectly pulled " + scsi_signals[cur_gpio] + " LOW, when it shouldn't have")
err_count = err_count+1
if cur_val != gpio.HIGH:
print(
"Error: GPIO "
+ scsi_signals[gpio_rec["gpio_num"]]
+ " incorrectly pulled "
+ scsi_signals[cur_gpio]
+ " LOW, when it shouldn't have"
)
err_count = err_count + 1
############################################
# set the transceivers to input
@ -196,22 +296,31 @@ def test_gpio_pin(gpio_rec):
for cur_gpio in scsi_signals:
# all of the gpios should be high except for the test gpio
cur_val = gpio.input(cur_gpio)
if( cur_gpio == gpio_rec['gpio_num']):
if(cur_val != gpio.LOW):
print("Error: Test commanded GPIO " + scsi_signals[gpio_rec['gpio_num']] + " to be low, but it did not respond")
err_count = err_count+1
if cur_gpio == gpio_rec["gpio_num"]:
if cur_val != gpio.LOW:
print(
"Error: Test commanded GPIO "
+ scsi_signals[gpio_rec["gpio_num"]]
+ " to be low, but it did not respond"
)
err_count = err_count + 1
else:
if(cur_val != gpio.HIGH):
print("Error: GPIO " + scsi_signals[gpio_rec['gpio_num']] + " incorrectly pulled " + scsi_signals[cur_gpio] + " LOW, when it shouldn't have")
err_count = err_count+1
if cur_val != gpio.HIGH:
print(
"Error: GPIO "
+ scsi_signals[gpio_rec["gpio_num"]]
+ " incorrectly pulled "
+ scsi_signals[cur_gpio]
+ " LOW, when it shouldn't have"
)
err_count = err_count + 1
# Set the transceiver back to output
set_output_channel(gpio_rec['dir_ctrl'])
set_output_channel(gpio_rec["dir_ctrl"])
#############################################
# set the test gpio high
gpio.output(gpio_rec['gpio_num'], gpio.HIGH)
gpio.output(gpio_rec["gpio_num"], gpio.HIGH)
time.sleep(pin_settle_delay)
@ -219,14 +328,24 @@ def test_gpio_pin(gpio_rec):
for cur_gpio in scsi_signals:
# all of the gpios should be high
cur_val = gpio.input(cur_gpio)
if( cur_gpio == gpio_rec['gpio_num']):
if(cur_val != gpio.HIGH):
print("Error: Test commanded GPIO " + scsi_signals[gpio_rec['gpio_num']] + " to be high, but it did not respond")
err_count = err_count+1
if cur_gpio == gpio_rec["gpio_num"]:
if cur_val != gpio.HIGH:
print(
"Error: Test commanded GPIO "
+ scsi_signals[gpio_rec["gpio_num"]]
+ " to be high, but it did not respond"
)
err_count = err_count + 1
else:
if(cur_val != gpio.HIGH):
print("Error: GPIO " + scsi_signals[gpio_rec['gpio_num']] + " incorrectly pulled " + scsi_signals[cur_gpio] + " LOW, when it shouldn't have")
err_count = err_count+1
if cur_val != gpio.HIGH:
print(
"Error: GPIO "
+ scsi_signals[gpio_rec["gpio_num"]]
+ " incorrectly pulled "
+ scsi_signals[cur_gpio]
+ " LOW, when it shouldn't have"
)
err_count = err_count + 1
# Initialize the GPIO library, set all of the gpios associated with SCSI signals to outputs and set
@ -235,7 +354,7 @@ def setup():
gpio.setmode(gpio.BOARD)
gpio.setwarnings(False)
for cur_gpio in gpio_map:
gpio.setup(cur_gpio['gpio_num'], gpio.OUT, initial=gpio.HIGH)
gpio.setup(cur_gpio["gpio_num"], gpio.OUT, initial=gpio.HIGH)
# Setup direction control
gpio.setup(rascsi_ind_gpio, gpio.OUT)
@ -244,7 +363,7 @@ def setup():
# Main functions for running the actual test.
if __name__ == '__main__':
if __name__ == "__main__":
# setup the GPIOs
setup()
# Test each SCSI signal in the gpio_map
@ -252,7 +371,7 @@ if __name__ == '__main__':
test_gpio_pin(cur_gpio)
# Print the test results
if(err_count == 0):
if err_count == 0:
print("-------- Test PASSED --------")
else:
print("!!!!!!!! Test FAILED !!!!!!!!")

View File

@ -8,6 +8,7 @@ class GracefulInterruptHandler:
"""
Class for handling Linux signal interrupts
"""
def __init__(self, signals=(signal.SIGINT, signal.SIGTERM)):
self.signals = signals
self.original_handlers = {}

View File

@ -52,7 +52,7 @@ parser.add_argument(
default=180,
action="store",
help="The rotation of the screen buffer in degrees",
)
)
parser.add_argument(
"--height",
type=int,
@ -60,7 +60,7 @@ parser.add_argument(
default=32,
action="store",
help="The pixel height of the screen buffer",
)
)
parser.add_argument(
"--refresh_interval",
type=int,
@ -68,14 +68,14 @@ parser.add_argument(
default=1000,
action="store",
help="Interval in ms between each screen refresh",
)
)
parser.add_argument(
"--password",
type=str,
default="",
action="store",
help="Token password string for authenticating with the backend",
)
)
parser.add_argument(
"--host",
type=str,
@ -156,7 +156,7 @@ LINE_SPACING = 8
# When using other fonts, you may need to adjust PADDING, FONT_SIZE,
# LINE_SPACING, and LINES.
# Some other nice fonts to try: http://www.dafont.com/bitmap.php
FONT = ImageFont.truetype('resources/type_writer.ttf', FONT_SIZE)
FONT = ImageFont.truetype("resources/type_writer.ttf", FONT_SIZE)
REMOVABLE_DEVICE_TYPES = ractl_cmd.get_removable_device_types()
PERIPHERAL_DEVICE_TYPES = ractl_cmd.get_peripheral_device_types()
@ -176,6 +176,7 @@ DRAW = ImageDraw.Draw(IMAGE)
# Draw a black filled box to clear the image.
DRAW.rectangle((0, 0, WIDTH, HEIGHT), outline=0, fill=0)
def formatted_output():
"""
Formats the strings to be displayed on the Screen
@ -216,6 +217,7 @@ def formatted_output():
output += ["No network connection"]
return output
def shutdown():
"""
Display the shutdown splash, then blank the screen after a sleep
@ -224,7 +226,7 @@ def shutdown():
OLED.image(IMAGE_STOP)
OLED.show()
OLED.fill(0)
sleep(700/1000)
sleep(700 / 1000)
OLED.show()
sys.exit("Shutting down the OLED display...")
@ -264,7 +266,7 @@ with GracefulInterruptHandler() as handler:
# Display image.
OLED.image(IMAGE)
OLED.show()
sleep(args.refresh_interval/1000)
sleep(args.refresh_interval / 1000)
snapshot = formatted_output()

4
python/pyproject.toml Normal file
View File

@ -0,0 +1,4 @@
[tool.black]
line-length = 100
target-version = ['py37', 'py38', 'py39']
extend-exclude = ".*_pb2.py"

View File

@ -1,2 +0,0 @@
[flake8]
max-line-length = 100

View File

@ -1,8 +1,4 @@
[tool.pytest.ini_options]
addopts = "--junitxml=report.xml"
log_cli = true
log_cli_level = "warn"
[tool.black]
line-length = 100
target-version = ['py37', 'py38', 'py39']
log_cli_level = "warn"

View File

@ -1,4 +1,4 @@
"""Module for mapping between rascsi return codes and translated strings"""
"""Module for mapping between return codes and translated strings"""
from rascsi.return_codes import ReturnCodes
from flask_babel import _, lazy_gettext
@ -6,8 +6,9 @@ from flask_babel import _, lazy_gettext
# pylint: disable=too-few-public-methods
class ReturnCodeMapper:
"""Class for mapping between rascsi return codes and translated strings"""
"""Class for mapping between return codes and translated strings"""
# fmt: off
MESSAGES = {
ReturnCodes.DELETEFILE_SUCCESS:
_("File deleted: %(file_path)s"),
@ -46,11 +47,12 @@ class ReturnCodeMapper:
ReturnCodes.EXTRACTIMAGE_COMMAND_ERROR:
_("Unable to extract archive: %(error)s"),
}
# fmt: on
@staticmethod
def add_msg(payload):
"""adds a msg key to a given payload with a rascsi module return code
with a translated return code message string. """
"""adds a msg key to a given payload with a module return code
with a translated return code message string."""
if "return_code" not in payload:
return payload
@ -60,7 +62,7 @@ class ReturnCodeMapper:
payload["msg"] = lazy_gettext(
ReturnCodeMapper.MESSAGES[payload["return_code"]],
**parameters,
)
)
else:
payload["msg"] = lazy_gettext(ReturnCodeMapper.MESSAGES[payload["return_code"]])

View File

@ -30,4 +30,4 @@ TEMPLATE_THEMES = ["classic", "modern"]
TEMPLATE_THEME_DEFAULT = "modern"
# Fallback theme for older browsers
TEMPLATE_THEME_LEGACY = "classic"
TEMPLATE_THEME_LEGACY = "classic"

View File

@ -5,9 +5,11 @@ Module for sending and receiving data over a socket connection with the RaSCSI b
from flask import abort
from flask_babel import _
from rascsi.exceptions import (EmptySocketChunkException,
InvalidProtobufResponse,
FailedSocketConnectionException)
from rascsi.exceptions import (
EmptySocketChunkException,
InvalidProtobufResponse,
FailedSocketConnectionException,
)
from rascsi.socket_cmds import SocketCmds
@ -15,6 +17,7 @@ class SocketCmdsFlask(SocketCmds):
"""
Class for sending and receiving data over a socket connection with the RaSCSI backend
"""
# pylint: disable=useless-super-delegation
def __init__(self, host="localhost", port=6868):
super().__init__(host, port)
@ -28,11 +31,16 @@ class SocketCmdsFlask(SocketCmds):
return super().send_pb_command(payload)
except FailedSocketConnectionException as err:
# After failing all attempts, throw a 404 error
abort(404, _(
"The RaSCSI Web Interface failed to connect to RaSCSI at %(host)s:%(port)s "
"with error: %(error_msg)s. The RaSCSI process is not running or may have crashed.",
host=self.host, port=self.port, error_msg=str(err),
)
abort(
404,
_(
"The RaSCSI Web Interface failed to connect to RaSCSI at "
"%(host)s:%(port)s with error: %(error_msg)s. The RaSCSI "
"process is not running or may have crashed.",
host=self.host,
port=self.port,
error_msg=str(err),
),
)
return None
@ -42,19 +50,21 @@ class SocketCmdsFlask(SocketCmds):
return super().send_over_socket(sock, payload)
except EmptySocketChunkException:
abort(
503, _(
503,
_(
"The RaSCSI Web Interface lost connection to RaSCSI. "
"Please go back and try again. "
"If the issue persists, please report a bug."
)
),
)
return None
except InvalidProtobufResponse:
abort(
500, _(
500,
_(
"The RaSCSI Web Interface did not get a valid response from RaSCSI. "
"Please go back and try again. "
"If the issue persists, please report a bug."
)
),
)
return None

View File

@ -11,7 +11,6 @@ from grp import getgrall
import bjoern
from rascsi.return_codes import ReturnCodes
from werkzeug.utils import secure_filename
from simplepam import authenticate
from flask_babel import Babel, Locale, refresh, _
@ -75,6 +74,7 @@ from settings import (
APP = Flask(__name__)
BABEL = Babel(APP)
def get_env_info():
"""
Get information about the app/host environment
@ -113,7 +113,7 @@ def response(
redirect_url=None,
error=False,
status_code=200,
**kwargs
**kwargs,
):
"""
Generates a HTML or JSON HTTP response
@ -128,11 +128,16 @@ def response(
messages = [(str(message), status)]
if request.headers.get("accept") == "application/json":
return jsonify({
"status": status,
"messages": [{"message": m, "category": c} for m, c in messages],
"data": kwargs
}), status_code
return (
jsonify(
{
"status": status,
"messages": [{"message": m, "category": c} for m, c in messages],
"data": kwargs,
}
),
status_code,
)
if messages:
for message, category in messages:
@ -182,7 +187,7 @@ def get_supported_locales():
locales = [
{"language": x.language, "display_name": x.display_name}
for x in [*BABEL.list_translations(), Locale("en")]
]
]
return sorted(locales, key=lambda x: x["language"])
@ -199,8 +204,8 @@ def index():
_(
"RaSCSI is password protected. "
"Start the Web Interface with the --password parameter."
),
)
),
)
server_info = ractl_cmd.get_server_info()
devices = ractl_cmd.list_devices()
@ -233,18 +238,15 @@ def index():
# This might break if something like 'hdt' etc. gets added in the future.
sorted(
[suffix for suffix in server_info["schd"] if suffix not in {"hdi", "nhd"}],
reverse=True
) +
server_info["scrm"] +
server_info["scmo"]
reverse=True,
)
+ server_info["scrm"]
+ server_info["scmo"]
)
valid_image_suffixes = (
server_info["schd"] +
server_info["scrm"] +
server_info["scmo"] +
server_info["sccd"]
)
server_info["schd"] + server_info["scrm"] + server_info["scmo"] + server_info["sccd"]
)
return response(
template="index.html",
@ -296,7 +298,7 @@ def drive_list():
template="drives.html",
files=file_cmd.list_images()["files"],
drive_properties=format_drive_properties(APP.config["RASCSI_DRIVE_PROPERTIES"]),
)
)
@APP.route("/login", methods=["POST"])
@ -312,10 +314,14 @@ def login():
session["username"] = request.form["username"]
return response(env=get_env_info())
return response(error=True, status_code=401, message=_(
"You must log in with valid credentials for a user in the '%(group)s' group",
group=AUTH_GROUP,
))
return response(
error=True,
status_code=401,
message=_(
"You must log in with valid credentials for a user in the '%(group)s' group",
group=AUTH_GROUP,
),
)
@APP.route("/logout")
@ -339,12 +345,14 @@ def login_required(func):
"""
Wrapper method for enabling authentication for an endpoint
"""
@wraps(func)
def decorated_function(*args, **kwargs):
auth = auth_active(AUTH_GROUP)
if auth["status"] and "username" not in session:
return response(error=True, message=auth["msg"])
return func(*args, **kwargs)
return decorated_function
@ -357,10 +365,7 @@ def drive_create():
drive_name = request.form.get("drive_name")
file_name = Path(request.form.get("file_name")).name
properties = get_properties_by_drive_name(
APP.config["RASCSI_DRIVE_PROPERTIES"],
drive_name
)
properties = get_properties_by_drive_name(APP.config["RASCSI_DRIVE_PROPERTIES"], drive_name)
if not properties:
return response(
@ -373,7 +378,7 @@ def drive_create():
file_name,
properties["file_type"],
properties["size"],
)
)
if not process["status"]:
return response(error=True, message=process["msg"])
@ -385,8 +390,11 @@ def drive_create():
if not process["status"]:
return response(error=True, message=process["msg"])
return response(message=
_("Image file with properties created: %(file_name)s", file_name=full_file_name)
return response(
message=_(
"Image file with properties created: %(file_name)s",
file_name=full_file_name,
)
)
@ -401,10 +409,7 @@ def drive_cdrom():
# Creating the drive properties file
file_name = f"{file_name}.{PROPERTIES_SUFFIX}"
properties = get_properties_by_drive_name(
APP.config["RASCSI_DRIVE_PROPERTIES"],
drive_name
)
properties = get_properties_by_drive_name(APP.config["RASCSI_DRIVE_PROPERTIES"], drive_name)
if not properties:
return response(
@ -474,19 +479,17 @@ def show_diskinfo():
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
server_info = ractl_cmd.get_server_info()
returncode, diskinfo = sys_cmd.get_diskinfo(
Path(server_info["image_dir"]) / file_name
)
returncode, diskinfo = sys_cmd.get_diskinfo(Path(server_info["image_dir"]) / file_name)
if returncode == 0:
return response(
template="diskinfo.html",
file_name=str(file_name),
diskinfo=diskinfo,
)
)
return response(
error=True,
message=_("An error occurred when getting disk info: %(error)s", error=diskinfo)
message=_("An error occurred when getting disk info: %(error)s", error=diskinfo),
)
@ -497,23 +500,20 @@ def show_manpage():
"""
app_allowlist = ["rascsi", "rasctl", "rasdump", "scsimon"]
app = request.args.get("app", type = str)
app = request.args.get("app", type=str)
if app not in app_allowlist:
return response(
error=True,
message=_("%(app)s is not a recognized RaSCSI app", app=app)
)
return response(error=True, message=_("%(app)s is not a recognized RaSCSI app", app=app))
file_path = f"{WEB_DIR}/../../../doc/{app}.1"
html_to_strip = [
"Content-type",
"!DOCTYPE",
"<HTML>",
"<HEAD>",
"<BODY>",
"<H1>",
]
"Content-type",
"!DOCTYPE",
"<HTML>",
"<HEAD>",
"<BODY>",
"<H1>",
]
returncode, manpage = sys_cmd.get_manpage(file_path)
if returncode == 0:
@ -524,7 +524,7 @@ def show_manpage():
line = line.replace("/?1+", "manpage?app=")
# Strip out useless hyperlink
elif "man2html" in line:
line = line.replace("<A HREF=\"/\">man2html</A>", "man2html")
line = line.replace('<A HREF="/">man2html</A>', "man2html")
if not any(ele in line for ele in html_to_strip):
formatted_manpage += line
@ -532,11 +532,11 @@ def show_manpage():
template="manpage.html",
app=app,
manpage=formatted_manpage,
)
)
return response(
error=True,
message=_("An error occurred when accessing man page: %(error)s", error=manpage)
message=_("An error occurred when accessing man page: %(error)s", error=manpage),
)
@ -555,11 +555,11 @@ def show_logs():
scope=scope,
lines=lines,
logs=logs,
)
)
return response(
error=True,
message=_("An error occurred when fetching logs: %(error)s", error=logs)
message=_("An error occurred when fetching logs: %(error)s", error=logs),
)
@ -615,10 +615,10 @@ def attach_device():
return response(error=True, message=bridge_status["msg"])
kwargs = {
"unit": int(unit),
"device_type": device_type,
"params": params,
}
"unit": int(unit),
"device_type": device_type,
"params": params,
}
if drive_props:
kwargs["vendor"] = drive_props["vendor"]
kwargs["product"] = drive_props["product"]
@ -628,12 +628,14 @@ def attach_device():
process = ractl_cmd.attach_device(scsi_id, **kwargs)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
return response(message=_(
"Attached %(device_type)s to SCSI ID %(id_number)s LUN %(unit_number)s",
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
))
return response(
message=_(
"Attached %(device_type)s to SCSI ID %(id_number)s LUN %(unit_number)s",
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
)
)
return response(error=True, message=process["msg"])
@ -687,7 +689,7 @@ def attach_image():
"The image may be corrupted, so proceed with caution.",
file_size,
expected_block_size,
)
)
return response(
message=_(
"Attached %(file_name)s as %(device_type)s to "
@ -696,8 +698,8 @@ def attach_image():
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
)
)
)
return response(error=True, message=process["msg"])
@ -725,8 +727,13 @@ def detach():
unit = request.form.get("unit")
process = ractl_cmd.detach_by_id(scsi_id, unit)
if process["status"]:
return response(message=_("Detached SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit))
return response(
message=_(
"Detached SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id,
unit_number=unit,
)
)
return response(error=True, message=process["msg"])
@ -742,8 +749,13 @@ def eject():
process = ractl_cmd.eject_by_id(scsi_id, unit)
if process["status"]:
return response(message=_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit))
return response(
message=_(
"Ejected SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id,
unit_number=unit,
)
)
return response(error=True, message=process["msg"])
@ -758,7 +770,7 @@ def device_info():
return response(
template="deviceinfo.html",
devices=process["device_list"],
)
)
return response(error=True, message=_("No devices attached"))
@ -793,7 +805,9 @@ def release_id():
process = ractl_cmd.reserve_scsi_ids(reserved_ids)
if process["status"]:
RESERVATIONS[int(scsi_id)] = ""
return response(message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id))
return response(
message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id)
)
return response(error=True, message=process["msg"])
@ -868,7 +882,7 @@ def download_to_iso():
else:
return response(
error=True,
message=_("%(iso_type)s is not a valid CD-ROM format.", iso_type=iso_type)
message=_("%(iso_type)s is not a valid CD-ROM format.", iso_type=iso_type),
)
process = file_cmd.download_file_to_iso(url, *iso_args)
@ -883,10 +897,10 @@ def download_to_iso():
)
process_attach = ractl_cmd.attach_device(
scsi_id,
device_type="SCCD",
params={"file": process["file_name"]},
)
scsi_id,
device_type="SCCD",
params={"file": process["file_name"]},
)
process_attach = ReturnCodeMapper.add_msg(process_attach)
if process_attach["status"]:
return response(
@ -965,7 +979,7 @@ def create_file():
Creates an empty image file in the images dir
"""
file_name = Path(request.form.get("file_name"))
size = (int(request.form.get("size")) * 1024 * 1024)
size = int(request.form.get("size")) * 1024 * 1024
file_type = request.form.get("type")
drive_name = request.form.get("drive_name")
drive_format = request.form.get("drive_format")
@ -984,11 +998,11 @@ def create_file():
if drive_format:
volume_name = f"HD {size / 1024 / 1024:0.0f}M"
known_formats = [
"Lido 7.56",
"SpeedTools 3.6",
"FAT16",
"FAT32",
]
"Lido 7.56",
"SpeedTools 3.6",
"FAT16",
"FAT32",
]
message_postfix = f" ({drive_format})"
if drive_format not in known_formats:
@ -997,7 +1011,7 @@ def create_file():
message=_(
"%(drive_format)s is not a valid hard disk format.",
drive_format=drive_format,
)
),
)
elif drive_format.startswith("FAT"):
if drive_format == "FAT16":
@ -1010,7 +1024,7 @@ def create_file():
message=_(
"%(drive_format)s is not a valid hard disk format.",
drive_format=drive_format,
)
),
)
process = file_cmd.partition_disk(full_file_name, volume_name, "FAT")
@ -1018,11 +1032,11 @@ def create_file():
return response(error=True, message=process["msg"])
process = file_cmd.format_fat(
full_file_name,
# FAT volume labels are max 11 chars
volume_name[:11],
fat_size,
)
full_file_name,
# FAT volume labels are max 11 chars
volume_name[:11],
fat_size,
)
if not process["status"]:
return response(error=True, message=process["msg"])
@ -1033,19 +1047,16 @@ def create_file():
return response(error=True, message=process["msg"])
process = file_cmd.format_hfs(
full_file_name,
volume_name,
driver_base_path / Path(drive_format.replace(" ", "-") + ".img"),
)
full_file_name,
volume_name,
driver_base_path / Path(drive_format.replace(" ", "-") + ".img"),
)
if not process["status"]:
return response(error=True, message=process["msg"])
# Creating the drive properties file, if one is chosen
if drive_name:
properties = get_properties_by_drive_name(
APP.config["RASCSI_DRIVE_PROPERTIES"],
drive_name
)
properties = get_properties_by_drive_name(APP.config["RASCSI_DRIVE_PROPERTIES"], drive_name)
if properties:
prop_file_name = f"{full_file_name}.{PROPERTIES_SUFFIX}"
process = file_cmd.write_drive_properties(prop_file_name, properties)
@ -1221,26 +1232,23 @@ def extract_image():
safe_path = is_safe_path(archive_file)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
extract_result = file_cmd.extract_image(
str(archive_file),
archive_members
)
extract_result = file_cmd.extract_image(str(archive_file), archive_members)
if extract_result["return_code"] == ReturnCodes.EXTRACTIMAGE_SUCCESS:
for properties_file in extract_result["properties_files_moved"]:
if properties_file["status"]:
logging.info(
"Properties file %s moved to %s",
properties_file["name"],
CFG_DIR,
)
"Properties file %s moved to %s",
properties_file["name"],
CFG_DIR,
)
else:
logging.warning(
"Failed to move properties file %s to %s",
properties_file["name"],
CFG_DIR,
)
"Failed to move properties file %s to %s",
properties_file["name"],
CFG_DIR,
)
return response(message=ReturnCodeMapper.add_msg(extract_result).get("msg"))
@ -1300,28 +1308,28 @@ if __name__ == "__main__":
default=8080,
action="store",
help="Port number the web server will run on",
)
)
parser.add_argument(
"--password",
type=str,
default="",
action="store",
help="Token password string for authenticating with RaSCSI",
)
)
parser.add_argument(
"--rascsi-host",
type=str,
default="localhost",
action="store",
help="RaSCSI host. Default: localhost",
)
)
parser.add_argument(
"--rascsi-port",
type=int,
default=6868,
action="store",
help="RaSCSI port. Default: 6868",
)
)
parser.add_argument(
"--log-level",
type=str,
@ -1329,12 +1337,12 @@ if __name__ == "__main__":
action="store",
help="Log level for Web UI. Default: warning",
choices=["debug", "info", "warning", "error", "critical"],
)
)
parser.add_argument(
"--dev-mode",
action="store_true",
help="Run in development mode"
)
help="Run in development mode",
)
arguments = parser.parse_args()
APP.config["RASCSI_TOKEN"] = arguments.password
@ -1357,14 +1365,17 @@ if __name__ == "__main__":
APP.config["RASCSI_DRIVE_PROPERTIES"] = []
logging.warning("Could not read drive properties from %s", DRIVE_PROPERTIES_FILE)
logging.basicConfig(stream=sys.stdout,
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s",
level=arguments.log_level.upper())
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s",
level=arguments.log_level.upper(),
)
if arguments.dev_mode:
print("Running rascsi-web in development mode ...")
APP.debug = True
from werkzeug.debug import DebuggedApplication
try:
bjoern.run(DebuggedApplication(APP, evalex=False), "0.0.0.0", arguments.port)
except KeyboardInterrupt:

View File

@ -14,6 +14,7 @@ from werkzeug.utils import secure_filename
from rascsi.sys_cmds import SysCmds
def get_valid_scsi_ids(devices, reserved_ids):
"""
Takes a list of (dict)s devices, and list of (int)s reserved_ids.
@ -43,7 +44,7 @@ def get_valid_scsi_ids(devices, reserved_ids):
"valid_ids": valid_ids,
"occupied_ids": occupied_ids,
"recommended_id": recommended_id,
}
}
def sort_and_format_devices(devices):
@ -180,7 +181,8 @@ def format_drive_properties(drive_properties):
"cd_conf": cd_conf,
"rm_conf": rm_conf,
"mo_conf": mo_conf,
}
}
def get_properties_by_drive_name(drives, drive_name):
"""
@ -198,11 +200,12 @@ def get_properties_by_drive_name(drives, drive_name):
"revision": drive["revision"],
"block_size": drive["block_size"],
"size": drive["size"],
}
}
logging.error("Properties for drive '%s' does not exist in database", drive_name)
return False
def auth_active(group):
"""
Inspects if the group defined in (str) group exists on the system.
@ -212,9 +215,9 @@ def auth_active(group):
groups = [g.gr_name for g in getgrall()]
if group in groups:
return {
"status": True,
"msg": _("You must log in to use this function"),
}
"status": True,
"msg": _("You must log in to use this function"),
}
return {"status": False, "msg": ""}
@ -272,7 +275,7 @@ def upload_with_dropzonejs(image_dir):
file_name = secure_filename(file_object.filename)
save_path = path.join(image_dir, file_name)
current_chunk = int(request.form['dzchunkindex'])
current_chunk = int(request.form["dzchunkindex"])
# Makes sure not to overwrite an existing file,
# but continues writing to a file transfer in progress
@ -307,21 +310,21 @@ def browser_supports_modern_themes():
return False
user_agent = user_agent_parser.Parse(user_agent_string)
if not user_agent['user_agent']['family']:
if not user_agent["user_agent"]["family"]:
return False
# (family, minimum version)
supported_browsers = [
('Safari', 14),
('Chrome', 100),
('Firefox', 100),
('Edge', 100),
('Mobile Safari', 14),
('Chrome Mobile', 100),
("Safari", 14),
("Chrome", 100),
("Firefox", 100),
("Edge", 100),
("Mobile Safari", 14),
("Chrome Mobile", 100),
]
current_ua_family = user_agent['user_agent']['family']
current_ua_version = float(user_agent['user_agent']['major'])
current_ua_family = user_agent["user_agent"]["family"]
current_ua_version = float(user_agent["user_agent"]["major"])
logging.info(f"Identified browser as family={current_ua_family}, version={current_ua_version}")
for supported_browser, supported_version in supported_browsers: