diff --git a/python/.flake8 b/python/.flake8 new file mode 100644 index 00000000..8b48d305 --- /dev/null +++ b/python/.flake8 @@ -0,0 +1,5 @@ +[flake8] +max-line-length = 100 +exclude = + venv + rascsi_interface_pb2.py \ No newline at end of file diff --git a/python/common/src/rascsi/common_settings.py b/python/common/src/rascsi/common_settings.py index 55487161..59064a49 100644 --- a/python/common/src/rascsi/common_settings.py +++ b/python/common/src/rascsi/common_settings.py @@ -13,17 +13,11 @@ CONFIG_FILE_SUFFIX = "json" PROPERTIES_SUFFIX = "properties" # Supported archive file suffixes -ARCHIVE_FILE_SUFFIXES = [ - "zip", - "sit", - "tar", - "gz", - "7z" - ] +ARCHIVE_FILE_SUFFIXES = ["zip", "sit", "tar", "gz", "7z"] # The RESERVATIONS list is used to keep track of the reserved ID memos. # Initialize with a list of 8 empty strings. RESERVATIONS = ["" for _ in range(0, 8)] # Standard error message for shell commands -SHELL_ERROR = "Shell command: \"%s\" led to error: %s" +SHELL_ERROR = 'Shell command: "%s" led to error: %s' diff --git a/python/common/src/rascsi/exceptions.py b/python/common/src/rascsi/exceptions.py index e06b5f61..b15856b0 100644 --- a/python/common/src/rascsi/exceptions.py +++ b/python/common/src/rascsi/exceptions.py @@ -8,8 +8,8 @@ class FailedSocketConnectionException(Exception): class EmptySocketChunkException(Exception): - """Raise when a socket payload contains an empty chunk which implies a possible problem. """ + """Raise when a socket payload contains an empty chunk which implies a possible problem.""" class InvalidProtobufResponse(Exception): - """Raise when a rascsi socket payload contains unpexpected data. """ + """Raise when a rascsi socket payload contains unpexpected data.""" diff --git a/python/common/src/rascsi/file_cmds.py b/python/common/src/rascsi/file_cmds.py index b1735df1..ff9ca0be 100644 --- a/python/common/src/rascsi/file_cmds.py +++ b/python/common/src/rascsi/file_cmds.py @@ -35,6 +35,7 @@ FILE_READ_ERROR = "Unhandled exception when reading file: %s" FILE_WRITE_ERROR = "Unhandled exception when writing to file: %s" URL_SAFE = "/:?&" + class FileCmds: """ class for methods reading from and writing to the file system @@ -57,15 +58,10 @@ class FileCmds: files_list = [] for file_path, _dirs, files in walk(dir_path): # Only list selected file types + # TODO: Refactor for readability? files = [f for f in files if f.lower().endswith(file_types)] files_list.extend( - [ - ( - file, - path.getsize(path.join(file_path, file)) - ) - for file in files - ] + [(file, path.getsize(path.join(file_path, file))) for file in files], ) return files_list @@ -108,7 +104,7 @@ class FileCmds: if file.name in prop_files: process = self.read_drive_properties( Path(CFG_DIR) / f"{file.name}.{PROPERTIES_SUFFIX}" - ) + ) prop = process["conf"] else: prop = False @@ -118,12 +114,14 @@ class FileCmds: try: archive_info = self._get_archive_info( f"{server_info['image_dir']}/{file.name}", - _cache_extra_key=file.size - ) + _cache_extra_key=file.size, + ) - properties_files = [x["path"] - for x in archive_info["members"] - if x["path"].endswith(PROPERTIES_SUFFIX)] + properties_files = [ + x["path"] + for x in archive_info["members"] + if x["path"].endswith(PROPERTIES_SUFFIX) + ] for member in archive_info["members"]: if member["is_dir"] or member["is_resource_fork"]: @@ -132,7 +130,9 @@ class FileCmds: if PurePath(member["path"]).suffix.lower()[1:] == PROPERTIES_SUFFIX: member["is_properties_file"] = True elif f"{member['path']}.{PROPERTIES_SUFFIX}" in properties_files: - member["related_properties_file"] = f"{member['path']}.{PROPERTIES_SUFFIX}" + member[ + "related_properties_file" + ] = f"{member['path']}.{PROPERTIES_SUFFIX}" archive_contents.append(member) except (unarchiver.LsarCommandError, unarchiver.LsarOutputError): @@ -140,14 +140,16 @@ class FileCmds: size_mb = "{:,.1f}".format(file.size / 1024 / 1024) dtype = proto.PbDeviceType.Name(file.type) - files.append({ - "name": file.name, - "size": file.size, - "size_mb": size_mb, - "detected_type": dtype, - "prop": prop, - "archive_contents": archive_contents, - }) + files.append( + { + "name": file.name, + "size": file.size, + "size_mb": size_mb, + "detected_type": dtype, + "prop": prop, + "archive_contents": archive_contents, + } + ) return {"status": result.status, "msg": result.msg, "files": files} @@ -233,22 +235,20 @@ class FileCmds: Takes (Path) file_path for the file to delete Returns (dict) with (bool) status and (str) msg """ - parameters = { - "file_path": file_path - } + parameters = {"file_path": file_path} if file_path.exists(): file_path.unlink() return { - "status": True, - "return_code": ReturnCodes.DELETEFILE_SUCCESS, - "parameters": parameters, - } - return { - "status": False, - "return_code": ReturnCodes.DELETEFILE_FILE_NOT_FOUND, + "status": True, + "return_code": ReturnCodes.DELETEFILE_SUCCESS, "parameters": parameters, } + return { + "status": False, + "return_code": ReturnCodes.DELETEFILE_FILE_NOT_FOUND, + "parameters": parameters, + } # noinspection PyMethodMayBeStatic def rename_file(self, file_path, target_path): @@ -258,21 +258,19 @@ class FileCmds: - (Path) target_path for the name to rename Returns (dict) with (bool) status and (str) msg """ - parameters = { - "target_path": target_path - } + parameters = {"target_path": target_path} if target_path.parent.exists: file_path.rename(target_path) return { "status": True, "return_code": ReturnCodes.RENAMEFILE_SUCCESS, "parameters": parameters, - } + } return { "status": False, "return_code": ReturnCodes.RENAMEFILE_UNABLE_TO_MOVE, "parameters": parameters, - } + } # noinspection PyMethodMayBeStatic def copy_file(self, file_path, target_path): @@ -282,21 +280,19 @@ class FileCmds: - (Path) target_path for the name to copy to Returns (dict) with (bool) status and (str) msg """ - parameters = { - "target_path": target_path - } + parameters = {"target_path": target_path} if target_path.parent.exists: copyfile(str(file_path), str(target_path)) return { "status": True, "return_code": ReturnCodes.WRITEFILE_SUCCESS, "parameters": parameters, - } + } return { "status": False, "return_code": ReturnCodes.WRITEFILE_UNABLE_TO_WRITE, "parameters": parameters, - } + } def extract_image(self, file_path, members=None, move_properties_files_to_config=True): """ @@ -312,60 +308,66 @@ class FileCmds: return { "status": False, "return_code": ReturnCodes.EXTRACTIMAGE_NO_FILES_SPECIFIED, - } + } try: extract_result = unarchiver.extract_archive( f"{server_info['image_dir']}/{file_path}", members=members, output_dir=server_info["image_dir"], - ) + ) properties_files_moved = [] if move_properties_files_to_config: for file in extract_result["extracted"]: if file.get("name").endswith(f".{PROPERTIES_SUFFIX}"): prop_path = Path(CFG_DIR) / file["name"] - if (self.rename_file( - Path(file["absolute_path"]), - prop_path, - )): - properties_files_moved.append({ - "status": True, - "name": file["path"], - "path": str(prop_path), - }) + if self.rename_file( + Path(file["absolute_path"]), + prop_path, + ): + properties_files_moved.append( + { + "status": True, + "name": file["path"], + "path": str(prop_path), + } + ) else: - properties_files_moved.append({ - "status": False, - "name": file["path"], - "path": str(prop_path), - }) + properties_files_moved.append( + { + "status": False, + "name": file["path"], + "path": str(prop_path), + } + ) return { "status": True, "return_code": ReturnCodes.EXTRACTIMAGE_SUCCESS, "parameters": { "count": len(extract_result["extracted"]), - }, + }, "extracted": extract_result["extracted"], "skipped": extract_result["skipped"], "properties_files_moved": properties_files_moved, - } + } except unarchiver.UnarNoFilesExtractedError: return { "status": False, "return_code": ReturnCodes.EXTRACTIMAGE_NO_FILES_EXTRACTED, - } - except (unarchiver.UnarCommandError, unarchiver.UnarUnexpectedOutputError) as error: + } + except ( + unarchiver.UnarCommandError, + unarchiver.UnarUnexpectedOutputError, + ) as error: return { "status": False, "return_code": ReturnCodes.EXTRACTIMAGE_COMMAND_ERROR, "parameters": { "error": error, - } - } - + }, + } # noinspection PyMethodMayBeStatic def partition_disk(self, file_name, volume_name, disk_format): @@ -399,42 +401,42 @@ class FileCmds: if disk_format == "HFS": partitioning_tool = "hfdisk" commands = [ - "i", - "", - "C", - "", - "32", - "Driver_Partition", - "Apple_Driver", - "C", - "", - "", - volume_name, - "Apple_HFS", - "w", - "y", - "p", - ] + "i", + "", + "C", + "", + "32", + "Driver_Partition", + "Apple_Driver", + "C", + "", + "", + volume_name, + "Apple_HFS", + "w", + "y", + "p", + ] # Create a DOS label, primary partition, W95 FAT type elif disk_format == "FAT": partitioning_tool = "fdisk" commands = [ - "o", - "n", - "p", - "", - "", - "", - "t", - "b", - "w", - ] + "o", + "n", + "p", + "", + "", + "", + "t", + "b", + "w", + ] try: process = Popen( - [partitioning_tool, str(full_file_path)], - stdin=PIPE, - stdout=PIPE, - ) + [partitioning_tool, str(full_file_path)], + stdin=PIPE, + stdout=PIPE, + ) for command in commands: process.stdin.write(bytes(command + "\n", "utf-8")) process.stdin.flush() @@ -464,7 +466,6 @@ class FileCmds: return {"status": True, "msg": ""} - # noinspection PyMethodMayBeStatic def format_hfs(self, file_name, volume_name, driver_path): """ @@ -514,7 +515,6 @@ class FileCmds: return {"status": True, "msg": ""} - # noinspection PyMethodMayBeStatic def format_fat(self, file_name, volume_name, fat_size): """ @@ -538,21 +538,21 @@ class FileCmds: else: logging.info(process.stdout.decode("utf-8")) self.delete_file(Path(file_name)) - return {"status": False, "msg": error.stderr.decode("utf-8")} + return {"status": False, "msg": process.stderr.decode("utf-8")} except (FileNotFoundError, CalledProcessError) as error: logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8")) self.delete_file(Path(file_name)) return {"status": False, "msg": error.stderr.decode("utf-8")} - args = [ - "mkfs.fat", - "-v", - "-F", - fat_size, - "-n", - volume_name, - "/dev/mapper/" + loopback_device, - ] + args = [ + "mkfs.fat", + "-v", + "-F", + fat_size, + "-n", + volume_name, + "/dev/mapper/" + loopback_device, + ] try: process = run( args, @@ -582,7 +582,6 @@ class FileCmds: return {"status": True, "msg": ""} - def download_file_to_iso(self, url, *iso_args): """ Takes (str) url and one or more (str) *iso_args @@ -592,7 +591,7 @@ class FileCmds: server_info = self.ractl.get_server_info() file_name = PurePath(url).name - iso_filename = Path(server_info['image_dir']) / f"{file_name}.iso" + iso_filename = Path(server_info["image_dir"]) / f"{file_name}.iso" with TemporaryDirectory() as tmp_dir: req_proc = self.download_to_dir(quote(url, safe=URL_SAFE), tmp_dir, file_name) @@ -603,23 +602,30 @@ class FileCmds: tmp_full_path = Path(tmp_dir) / file_name if is_zipfile(tmp_full_path): if "XtraStuf.mac" in str(ZipFile(str(tmp_full_path)).namelist()): - logging.info("MacZip file format detected. Will not unzip to retain resource fork.") + logging.info( + "MacZip file format detected. Will not unzip to retain resource fork." + ) else: logging.info( "%s is a zipfile! Will attempt to unzip and store the resulting files.", tmp_full_path, + ) + unzip_proc = asyncio.run( + self.run_async( + "unzip", + [ + "-d", + str(tmp_dir), + "-n", + str(tmp_full_path), + ], ) - unzip_proc = asyncio.run(self.run_async("unzip", [ - "-d", - str(tmp_dir), - "-n", - str(tmp_full_path), - ])) + ) if not unzip_proc["returncode"]: logging.info( "%s was successfully unzipped. Deleting the zipfile.", tmp_full_path, - ) + ) tmp_full_path.unlink(True) try: @@ -638,9 +644,7 @@ class FileCmds: logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8")) return {"status": False, "msg": error.stderr.decode("utf-8")} - parameters = { - "value": " ".join(iso_args) - } + parameters = {"value": " ".join(iso_args)} return { "status": True, "return_code": ReturnCodes.DOWNLOADFILETOISO_SUCCESS, @@ -658,10 +662,10 @@ class FileCmds: try: with requests.get( - quote(url, safe=URL_SAFE), - stream=True, - headers={"User-Agent": "Mozilla/5.0"}, - ) as req: + quote(url, safe=URL_SAFE), + stream=True, + headers={"User-Agent": "Mozilla/5.0"}, + ) as req: req.raise_for_status() try: with open(f"{save_dir}/{file_name}", "wb") as download: @@ -677,10 +681,7 @@ class FileCmds: logging.info("Response content-type: %s", req.headers["content-type"]) logging.info("Response status code: %s", req.status_code) - parameters = { - "file_name": file_name, - "save_dir": save_dir - } + parameters = {"file_name": file_name, "save_dir": save_dir} return { "status": True, "return_code": ReturnCodes.DOWNLOADTODIR_SUCCESS, @@ -715,28 +716,29 @@ class FileCmds: reserved_ids_and_memos = [] reserved_ids = self.ractl.get_reserved_ids()["ids"] for scsi_id in reserved_ids: - reserved_ids_and_memos.append({"id": scsi_id, - "memo": RESERVATIONS[int(scsi_id)]}) - dump( - {"version": version, - "devices": devices, - "reserved_ids": reserved_ids_and_memos}, - json_file, - indent=4 + reserved_ids_and_memos.append( + {"id": scsi_id, "memo": RESERVATIONS[int(scsi_id)]} ) - parameters = { - "target_path": file_path - } + dump( + { + "version": version, + "devices": devices, + "reserved_ids": reserved_ids_and_memos, + }, + json_file, + indent=4, + ) + parameters = {"target_path": file_path} return { "status": True, "return_code": ReturnCodes.WRITEFILE_SUCCESS, "parameters": parameters, - } + } except (IOError, ValueError, EOFError, TypeError) as error: logging.error(str(error)) self.delete_file(Path(file_path)) return {"status": False, "msg": str(error)} - except: + except Exception: logging.error(FILE_WRITE_ERROR, file_name) self.delete_file(Path(file_path)) raise @@ -770,7 +772,7 @@ class FileCmds: "revision": row["revision"], "block_size": row["block_size"], "params": dict(row["params"]), - } + } if row["image"]: kwargs["params"]["file"] = row["image"] self.ractl.attach_device(row["id"], **kwargs) @@ -789,27 +791,27 @@ class FileCmds: "revision": row["revision"], "block_size": row["block_size"], "params": dict(row["params"]), - } + } if row["image"]: kwargs["params"]["file"] = row["image"] self.ractl.attach_device(row["id"], **kwargs) logging.warning("%s is in an obsolete config file format", file_name) else: - return {"status": False, - "return_code": ReturnCodes.READCONFIG_INVALID_CONFIG_FILE_FORMAT} + return { + "status": False, + "return_code": ReturnCodes.READCONFIG_INVALID_CONFIG_FILE_FORMAT, + } - parameters = { - "file_name": file_name - } + parameters = {"file_name": file_name} return { "status": True, "return_code": ReturnCodes.READCONFIG_SUCCESS, - "parameters": parameters - } + "parameters": parameters, + } except (IOError, ValueError, EOFError, TypeError) as error: logging.error(str(error)) return {"status": False, "msg": str(error)} - except: + except Exception: logging.error(FILE_READ_ERROR, str(file_path)) raise @@ -823,19 +825,17 @@ class FileCmds: try: with open(file_path, "w") as json_file: dump(conf, json_file, indent=4) - parameters = { - "target_path": str(file_path) - } + parameters = {"target_path": str(file_path)} return { "status": True, "return_code": ReturnCodes.WRITEFILE_SUCCESS, "parameters": parameters, - } + } except (IOError, ValueError, EOFError, TypeError) as error: logging.error(str(error)) self.delete_file(file_path) return {"status": False, "msg": str(error)} - except: + except Exception: logging.error(FILE_WRITE_ERROR, str(file_path)) self.delete_file(file_path) raise @@ -850,19 +850,17 @@ class FileCmds: try: with open(file_path) as json_file: conf = load(json_file) - parameters = { - "file_path": str(file_path) - } + parameters = {"file_path": str(file_path)} return { "status": True, "return_codes": ReturnCodes.READDRIVEPROPS_SUCCESS, "parameters": parameters, "conf": conf, - } + } except (IOError, ValueError, EOFError, TypeError) as error: logging.error(str(error)) return {"status": False, "msg": str(error)} - except: + except Exception: logging.error(FILE_READ_ERROR, str(file_path)) raise @@ -877,11 +875,17 @@ class FileCmds: program, *args, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) + stderr=asyncio.subprocess.PIPE, + ) stdout, stderr = await proc.communicate() - logging.info("Executed command \"%s %s\" with status code %d", program, " ".join(args), proc.returncode) + logging.info( + 'Executed command "%s %s" with status code %d', + program, + " ".join(args), + proc.returncode, + ) if stdout: stdout = stdout.decode() logging.info("stdout: %s", stdout) diff --git a/python/common/src/rascsi/ractl_cmds.py b/python/common/src/rascsi/ractl_cmds.py index 22d8136f..a58a5f00 100644 --- a/python/common/src/rascsi/ractl_cmds.py +++ b/python/common/src/rascsi/ractl_cmds.py @@ -11,6 +11,7 @@ class RaCtlCmds: """ Class for commands sent to the RaSCSI backend service. """ + def __init__(self, sock_cmd: SocketCmds, token=None, locale="en"): self.sock_cmd = sock_cmd self.token = token @@ -37,9 +38,13 @@ class RaCtlCmds: data = self.sock_cmd.send_pb_command(command.SerializeToString()) result = proto.PbResult() result.ParseFromString(data) - version = (str(result.server_info.version_info.major_version) + "." + - str(result.server_info.version_info.minor_version) + "." + - str(result.server_info.version_info.patch_version)) + version = ( + str(result.server_info.version_info.major_version) + + "." + + str(result.server_info.version_info.minor_version) + + "." + + str(result.server_info.version_info.patch_version) + ) log_levels = list(result.server_info.log_level_info.log_levels) current_log_level = result.server_info.log_level_info.current_log_level reserved_ids = list(result.server_info.reserved_ids_info.ids) @@ -74,7 +79,7 @@ class RaCtlCmds: "scrm": scrm, "scmo": scmo, "sccd": sccd, - } + } def get_reserved_ids(self): """ @@ -137,11 +142,11 @@ class RaCtlCmds: for key, value in device.properties.default_params.items(): params[key] = value device_types[proto.PbDeviceType.Name(device.type)] = { - "removable": device.properties.removable, - "supports_file": device.properties.supports_file, - "params": params, - "block_sizes": list(device.properties.block_sizes), - } + "removable": device.properties.removable, + "supports_file": device.properties.supports_file, + "params": params, + "block_sizes": list(device.properties.block_sizes), + } return {"status": result.status, "device_types": device_types} def get_removable_device_types(self): @@ -176,8 +181,8 @@ class RaCtlCmds: device_types = self.get_device_types() image_device_types = self.get_disk_device_types() peripheral_device_types = [ - x for x in device_types["device_types"] if x not in image_device_types - ] + x for x in device_types["device_types"] if x not in image_device_types + ] return peripheral_device_types def get_image_files_info(self): @@ -205,7 +210,7 @@ class RaCtlCmds: "images_dir": images_dir, "image_files": image_files, "scan_depth": scan_depth, - } + } def attach_device(self, scsi_id, **kwargs): """ @@ -245,13 +250,13 @@ class RaCtlCmds: if current_type != device_type: parameters = { "device_type": device_type, - "current_device_type": current_type + "current_device_type": current_type, } return { "status": False, "return_code": ReturnCodes.ATTACHIMAGE_COULD_NOT_ATTACH, "parameters": parameters, - } + } command.operation = proto.PbOperation.INSERT # Handling attaching a new device @@ -394,20 +399,22 @@ class RaCtlCmds: dblock = result.devices_info.devices[i].block_size dsize = int(result.devices_info.devices[i].block_count) * int(dblock) - device_list.append({ - "id": did, - "unit": dunit, - "device_type": dtype, - "status": ", ".join(dstat_msg), - "image": dpath, - "file": dfile, - "params": dparam, - "vendor": dven, - "product": dprod, - "revision": drev, - "block_size": dblock, - "size": dsize, - }) + device_list.append( + { + "id": did, + "unit": dunit, + "device_type": dtype, + "status": ", ".join(dstat_msg), + "image": dpath, + "file": dfile, + "params": dparam, + "vendor": dven, + "product": dprod, + "revision": drev, + "block_size": dblock, + "size": dsize, + } + ) i += 1 return {"status": result.status, "msg": result.msg, "device_list": device_list} diff --git a/python/common/src/rascsi/return_codes.py b/python/common/src/rascsi/return_codes.py index 638a61c5..66d6926b 100644 --- a/python/common/src/rascsi/return_codes.py +++ b/python/common/src/rascsi/return_codes.py @@ -6,6 +6,7 @@ Module for return codes that are refrenced in the return payloads of the rascsi # pylint: disable=too-few-public-methods class ReturnCodes: """Class for the return codes used within the rascsi module.""" + DELETEFILE_SUCCESS = 0 DELETEFILE_FILE_NOT_FOUND = 1 RENAMEFILE_SUCCESS = 10 diff --git a/python/common/src/rascsi/socket_cmds.py b/python/common/src/rascsi/socket_cmds.py index 5b5a7f04..cee8fd29 100644 --- a/python/common/src/rascsi/socket_cmds.py +++ b/python/common/src/rascsi/socket_cmds.py @@ -7,15 +7,18 @@ import socket from time import sleep from struct import pack, unpack -from rascsi.exceptions import (EmptySocketChunkException, - InvalidProtobufResponse, - FailedSocketConnectionException) +from rascsi.exceptions import ( + EmptySocketChunkException, + InvalidProtobufResponse, + FailedSocketConnectionException, +) class SocketCmds: """ Class for sending and receiving data over a socket connection with the RaSCSI backend """ + def __init__(self, host="localhost", port=6868): self.host = host self.port = port @@ -38,8 +41,11 @@ class SocketCmds: return response except socket.error as error: counter += 1 - logging.warning("The RaSCSI service is not responding - attempt %s/%s", - str(counter), str(tries)) + logging.warning( + "The RaSCSI service is not responding - attempt %s/%s", + str(counter), + str(tries), + ) error_msg = str(error) sleep(0.2) except EmptySocketChunkException as ex: @@ -75,18 +81,22 @@ class SocketCmds: bytes_recvd = 0 while bytes_recvd < response_length: chunk = sock.recv(min(response_length - bytes_recvd, 2048)) - if chunk == b'': - error_message = ("Read an empty chunk from the socket. Socket connection has " - "dropped unexpectedly. RaSCSI may have crashed.") + if chunk == b"": + error_message = ( + "Read an empty chunk from the socket. Socket connection has " + "dropped unexpectedly. RaSCSI may have crashed." + ) logging.error(error_message) raise EmptySocketChunkException(error_message) chunks.append(chunk) bytes_recvd = bytes_recvd + len(chunk) - response_message = b''.join(chunks) + response_message = b"".join(chunks) return response_message - error_message = ("The response from RaSCSI did not contain a protobuf header. " - "RaSCSI may have crashed.") + error_message = ( + "The response from RaSCSI did not contain a protobuf header. " + "RaSCSI may have crashed." + ) logging.error(error_message) raise InvalidProtobufResponse(error_message) diff --git a/python/common/src/rascsi/sys_cmds.py b/python/common/src/rascsi/sys_cmds.py index c2609a43..647fef2f 100644 --- a/python/common/src/rascsi/sys_cmds.py +++ b/python/common/src/rascsi/sys_cmds.py @@ -12,6 +12,7 @@ from platform import uname from rascsi.common_settings import SHELL_ERROR + class SysCmds: """ Class for commands sent to the Pi's Linux system. @@ -30,7 +31,7 @@ class SysCmds: ["git", "rev-parse", "HEAD"], capture_output=True, check=True, - ) + ) .stdout.decode("utf-8") .strip() ) @@ -68,7 +69,7 @@ class SysCmds: return { "git": ra_git_version, "env": f"{hardware}, {env.system} {env.release} {env.machine}", - } + } @staticmethod def running_proc(daemon): @@ -82,7 +83,7 @@ class SysCmds: ["ps", "aux"], capture_output=True, check=True, - ) + ) .stdout.decode("utf-8") .strip() ) @@ -104,7 +105,7 @@ class SysCmds: ["brctl", "show"], capture_output=True, check=True, - ) + ) .stdout.decode("utf-8") .strip() ) @@ -155,7 +156,7 @@ class SysCmds: sock = socket(AF_INET, SOCK_DGRAM) try: # mock ip address; doesn't have to be reachable - sock.connect(('10.255.255.255', 1)) + sock.connect(("10.255.255.255", 1)) ip_addr = sock.getsockname()[0] except Exception: ip_addr = False @@ -170,10 +171,10 @@ class SysCmds: """ try: process = run( - ["hostnamectl", "status", "--pretty"], - capture_output=True, - check=True, - ) + ["hostnamectl", "status", "--pretty"], + capture_output=True, + check=True, + ) pretty_hostname = process.stdout.decode("utf-8").rstrip() if pretty_hostname: return pretty_hostname @@ -188,11 +189,11 @@ class SysCmds: Set the pretty hostname for the system """ try: - process = run( - ["sudo", "hostnamectl", "set-hostname", "--pretty", name], - capture_output=False, - check=True, - ) + run( + ["sudo", "hostnamectl", "set-hostname", "--pretty", name], + capture_output=False, + check=True, + ) except CalledProcessError as error: logging.error(str(error)) return False @@ -213,9 +214,9 @@ class SysCmds: if scope: scope_param = ["-u", scope] process = run( - ["journalctl"] + line_param + scope_param, - capture_output=True, - ) + ["journalctl"] + line_param + scope_param, + capture_output=True, + ) if process.returncode == 0: return process.returncode, process.stdout.decode("utf-8") @@ -228,9 +229,9 @@ class SysCmds: Returns either the disktype output, or the stderr output. """ process = run( - ["disktype", file_path], - capture_output=True, - ) + ["disktype", file_path], + capture_output=True, + ) if process.returncode == 0: return process.returncode, process.stdout.decode("utf-8") @@ -243,9 +244,9 @@ class SysCmds: Returns either the man2html output, or the stderr output. """ process = run( - ["man2html", file_path, "-M", "/"], - capture_output=True, - ) + ["man2html", file_path, "-M", "/"], + capture_output=True, + ) if process.returncode == 0: return process.returncode, process.stdout.decode("utf-8") @@ -257,9 +258,9 @@ class SysCmds: Sends a reboot command to the system """ process = run( - ["sudo", "reboot"], - capture_output=True, - ) + ["sudo", "reboot"], + capture_output=True, + ) if process.returncode == 0: return process.returncode, process.stdout.decode("utf-8") @@ -271,9 +272,9 @@ class SysCmds: Sends a shutdown command to the system """ process = run( - ["sudo", "shutdown", "-h", "now"], - capture_output=True, - ) + ["sudo", "shutdown", "-h", "now"], + capture_output=True, + ) if process.returncode == 0: return process.returncode, process.stdout.decode("utf-8") diff --git a/python/common/src/util/run.py b/python/common/src/util/run.py index fe60fcd0..620e0986 100644 --- a/python/common/src/util/run.py +++ b/python/common/src/util/run.py @@ -4,31 +4,27 @@ Utility module for running system commands with basic logging import asyncio import logging -import os def run(program, args=None): - """ Run a command and return its output """ + """Run a command and return its output""" return asyncio.run(run_async(program, args)) async def run_async(program, args=None): - """ Run a command in the background """ + """Run a command in the background""" proc = await asyncio.create_subprocess_exec( - program, - *args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) + program, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) stdout, stderr = await proc.communicate() logging.info( - "Executed command \"%s %s\" with status code %d", + 'Executed command "%s %s" with status code %d', program, " ".join(args), - proc.returncode - ) + proc.returncode, + ) if stdout: stdout = stdout.decode() @@ -42,4 +38,4 @@ async def run_async(program, args=None): "returncode": proc.returncode, "stdout": stdout, "stderr": stderr, - } + } diff --git a/python/common/src/util/unarchiver.py b/python/common/src/util/unarchiver.py index d1d54619..09c1c514 100644 --- a/python/common/src/util/unarchiver.py +++ b/python/common/src/util/unarchiver.py @@ -25,7 +25,8 @@ def extract_archive(file_path, **kwargs): Takes (str) file_path, and kwargs: - (list) members - list of (str) files to be extracted (all files are extracted if None) - (str) output_dir - directory to place the extracted files - - (str) fork_output_type - output type for resource forks; "visible" for *.rsrc files, "hidden" for ._* files + - (str) fork_output_type - output type for resource forks; + "visible" for *.rsrc files, "hidden" for ._* files Returns (dict) of extracted and skipped members """ members = kwargs.get("members") @@ -39,7 +40,9 @@ def extract_archive(file_path, **kwargs): if kwargs.get("fork_output_type"): if kwargs["fork_output_type"] not in FORK_OUTPUT_TYPES: - raise ValueError(f"Argument fork_output_type must be one of: {','.join(FORK_OUTPUT_TYPES)} ") + raise ValueError( + f"Argument fork_output_type must be one of: {','.join(FORK_OUTPUT_TYPES)} " + ) fork_output_type = kwargs["fork_output_type"] fork_output_type_args = ["-forks", fork_output_type or FORK_OUTPUT_TYPE_VISIBLE] else: @@ -53,9 +56,9 @@ def extract_archive(file_path, **kwargs): "-force-skip", "-no-directory", *fork_output_type_args, - '--', + "--", file_path, - ] + ] if members: for member in members: @@ -68,8 +71,10 @@ def extract_archive(file_path, **kwargs): unar_result_success = r'^Successfully extracted to "(?P.+)".$' unar_result_no_files = "No files extracted." - unar_file_extracted = \ - r"^ {2}(?P.+). \(((?P\d+) B)?(?P(dir)?(, )?(rsrc)?)\)\.\.\. (?P[A-Z]+)\.$" + unar_file_extracted = ( + r"^ {2}(?P.+). \(((?P\d+) B)?(?P(dir)?(, )?" + r"(rsrc)?)\)\.\.\. (?P[A-Z]+)\.$" + ) lines = process["stdout"].rstrip("\n").split("\n") @@ -90,7 +95,7 @@ def extract_archive(file_path, **kwargs): "is_dir": False, "is_resource_fork": False, "absolute_path": str(pathlib.PurePath(tmp_dir).joinpath(matches["path"])), - } + } member_types = matches.get("types", "") if member_types.startswith(", "): @@ -112,10 +117,14 @@ def extract_archive(file_path, **kwargs): member["name"] = f"._{member['name']}" else: member["name"] += ".rsrc" - member["path"] = str(pathlib.PurePath(member["path"]).parent.joinpath(member["name"])) - member["absolute_path"] = str(pathlib.PurePath(tmp_dir).joinpath(member["path"])) + member["path"] = str( + pathlib.PurePath(member["path"]).parent.joinpath(member["name"]) + ) + member["absolute_path"] = str( + pathlib.PurePath(tmp_dir).joinpath(member["path"]) + ) - logging.debug("Extracted: %s -> %s", member['path'], member['absolute_path']) + logging.debug("Extracted: %s -> %s", member["path"], member["absolute_path"]) extracted_members.append(member) else: raise UnarUnexpectedOutputError(f"Unexpected output: {line}") @@ -128,7 +137,10 @@ def extract_archive(file_path, **kwargs): member["absolute_path"] = str(target_path) if target_path.exists(): - logging.info("Skipping temp file/dir as the target already exists: %s", target_path) + logging.info( + "Skipping temp file/dir as the target already exists: %s", + target_path, + ) skipped.append(member) continue @@ -147,7 +159,7 @@ def extract_archive(file_path, **kwargs): return { "extracted": moved, "skipped": skipped, - } + } raise UnarUnexpectedOutputError(lines[-1]) @@ -171,37 +183,41 @@ def inspect_archive(file_path): except JSONDecodeError as error: raise LsarOutputError(f"Unable to read JSON output from lsar: {error.msg}") from error - members = [{ - "name": pathlib.PurePath(member.get("XADFileName")).name, - "path": member.get("XADFileName"), - "size": member.get("XADFileSize"), - "is_dir": member.get("XADIsDirectory"), - "is_resource_fork": member.get("XADIsResourceFork"), - "raw": member, - } for member in archive_info.get("lsarContents", [])] + members = [ + { + "name": pathlib.PurePath(member.get("XADFileName")).name, + "path": member.get("XADFileName"), + "size": member.get("XADFileSize"), + "is_dir": member.get("XADIsDirectory"), + "is_resource_fork": member.get("XADIsResourceFork"), + "raw": member, + } + for member in archive_info.get("lsarContents", []) + ] return { "format": archive_info.get("lsarFormatName"), "members": members, - } + } class UnarCommandError(Exception): - """ Command execution was unsuccessful """ + """Command execution was unsuccessful""" + pass class UnarNoFilesExtractedError(Exception): - """ Command completed, but no files extracted """ + """Command completed, but no files extracted""" class UnarUnexpectedOutputError(Exception): - """ Command output not recognized """ + """Command output not recognized""" class LsarCommandError(Exception): - """ Command execution was unsuccessful """ + """Command execution was unsuccessful""" class LsarOutputError(Exception): - """ Command output could not be parsed""" + """Command output could not be parsed""" diff --git a/python/ctrlboard/src/config.py b/python/ctrlboard/src/config.py index 02f61df2..8ebdb689 100644 --- a/python/ctrlboard/src/config.py +++ b/python/ctrlboard/src/config.py @@ -7,6 +7,7 @@ from ctrlboard_hw.ctrlboard_hw_constants import CtrlBoardHardwareConstants # pylint: disable=too-few-public-methods class CtrlboardConfig: """Class for central RaSCSI control board configuration parameters""" + ROTATION = 0 WIDTH = 128 HEIGHT = 64 diff --git a/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_menu_update_event_handler.py b/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_menu_update_event_handler.py index cdb9715c..558d6d20 100644 --- a/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_menu_update_event_handler.py +++ b/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_menu_update_event_handler.py @@ -19,8 +19,12 @@ from rascsi_menu_controller import RascsiMenuController class CtrlBoardMenuUpdateEventHandler(Observer): """Class interfacing the menu controller the RaSCSI Control Board hardware.""" - def __init__(self, menu_controller: RascsiMenuController, sock_cmd: SocketCmds, - ractl_cmd: RaCtlCmds): + def __init__( + self, + menu_controller: RascsiMenuController, + sock_cmd: SocketCmds, + ractl_cmd: RaCtlCmds, + ): self.message = None self._menu_controller = menu_controller self._menu_renderer_config = self._menu_controller.get_menu_renderer().get_config() @@ -73,16 +77,18 @@ class CtrlBoardMenuUpdateEventHandler(Observer): def handle_button1(self): """Method for handling the first cycle button (cycle profiles)""" if self.rascsi_profile_cycler is None: - self.rascsi_profile_cycler = RascsiProfileCycler(self._menu_controller, self.sock_cmd, - self.ractl_cmd, return_entry=True) + self.rascsi_profile_cycler = RascsiProfileCycler( + self._menu_controller, self.sock_cmd, self.ractl_cmd, return_entry=True + ) else: self.rascsi_profile_cycler.cycle() def handle_button2(self): """Method for handling the second cycle button (cycle shutdown)""" if self.rascsi_shutdown_cycler is None: - self.rascsi_shutdown_cycler = RascsiShutdownCycler(self._menu_controller, self.sock_cmd, - self.ractl_cmd) + self.rascsi_shutdown_cycler = RascsiShutdownCycler( + self._menu_controller, self.sock_cmd, self.ractl_cmd + ) else: self.rascsi_shutdown_cycler.cycle() @@ -100,8 +106,10 @@ class CtrlBoardMenuUpdateEventHandler(Observer): handler_function(info_object) except AttributeError: log = logging.getLogger(__name__) - log.error("Handler function [%s] not found or returned an error. Skipping.", - str(handler_function_name)) + log.error( + "Handler function [%s] not found or returned an error. Skipping.", + str(handler_function_name), + ) # noinspection PyUnusedLocal # pylint: disable=unused-argument @@ -109,9 +117,11 @@ class CtrlBoardMenuUpdateEventHandler(Observer): """Method handles the rotary button press with the scsi list to open the action menu.""" context_object = self._menu_controller.get_active_menu().get_current_info_object() self.context_stack.append(context_object) - self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU, context_object=context_object, - transition_attributes=self._menu_renderer_config. - transition_attributes_left) + self._menu_controller.segue( + CtrlBoardMenuBuilder.ACTION_MENU, + context_object=context_object, + transition_attributes=self._menu_renderer_config.transition_attributes_left, + ) # noinspection PyUnusedLocal # pylint: disable=unused-argument @@ -119,9 +129,10 @@ class CtrlBoardMenuUpdateEventHandler(Observer): """Method handles the rotary button press to return from the action menu to the scsi list.""" self.context_stack.pop() - self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, - transition_attributes=self._menu_renderer_config. - transition_attributes_right) + self._menu_controller.segue( + CtrlBoardMenuBuilder.SCSI_ID_MENU, + transition_attributes=self._menu_renderer_config.transition_attributes_right, + ) # noinspection PyUnusedLocal # pylint: disable=unused-argument @@ -129,9 +140,11 @@ class CtrlBoardMenuUpdateEventHandler(Observer): """Method handles the rotary button press on attach in the action menu.""" context_object = self._menu_controller.get_active_menu().context_object self.context_stack.append(context_object) - self._menu_controller.segue(CtrlBoardMenuBuilder.IMAGES_MENU, context_object=context_object, - transition_attributes=self._menu_renderer_config. - transition_attributes_left) + self._menu_controller.segue( + CtrlBoardMenuBuilder.IMAGES_MENU, + context_object=context_object, + transition_attributes=self._menu_renderer_config.transition_attributes_left, + ) # noinspection PyUnusedLocal def handle_action_menu_slot_detacheject(self, info_object): @@ -139,39 +152,43 @@ class CtrlBoardMenuUpdateEventHandler(Observer): context_object = self._menu_controller.get_active_menu().context_object self.detach_eject_scsi_id() self.context_stack = [] - self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, - context_object=context_object, - transition_attributes=self._menu_renderer_config. - transition_attributes_right) + self._menu_controller.segue( + CtrlBoardMenuBuilder.SCSI_ID_MENU, + context_object=context_object, + transition_attributes=self._menu_renderer_config.transition_attributes_right, + ) # noinspection PyUnusedLocal def handle_action_menu_slot_info(self, info_object): """Method handles the rotary button press on 'Info' in the action menu.""" context_object = self._menu_controller.get_active_menu().context_object self.context_stack.append(context_object) - self._menu_controller.segue(CtrlBoardMenuBuilder.DEVICEINFO_MENU, - transition_attributes=self._menu_renderer_config. - transition_attributes_left, - context_object=context_object) + self._menu_controller.segue( + CtrlBoardMenuBuilder.DEVICEINFO_MENU, + transition_attributes=self._menu_renderer_config.transition_attributes_left, + context_object=context_object, + ) # noinspection PyUnusedLocal def handle_device_info_menu_return(self, info_object): """Method handles the rotary button press on 'Return' in the info menu.""" self.context_stack.pop() context_object = self._menu_controller.get_active_menu().context_object - self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU, - transition_attributes=self._menu_renderer_config. - transition_attributes_right, - context_object=context_object) + self._menu_controller.segue( + CtrlBoardMenuBuilder.ACTION_MENU, + transition_attributes=self._menu_renderer_config.transition_attributes_right, + context_object=context_object, + ) # noinspection PyUnusedLocal def handle_action_menu_loadprofile(self, info_object): """Method handles the rotary button press on 'Load Profile' in the action menu.""" context_object = self._menu_controller.get_active_menu().context_object self.context_stack.append(context_object) - self._menu_controller.segue(CtrlBoardMenuBuilder.PROFILES_MENU, - transition_attributes=self._menu_renderer_config. - transition_attributes_left) + self._menu_controller.segue( + CtrlBoardMenuBuilder.PROFILES_MENU, + transition_attributes=self._menu_renderer_config.transition_attributes_left, + ) # noinspection PyUnusedLocal def handle_profiles_menu_loadprofile(self, info_object): @@ -186,28 +203,31 @@ class CtrlBoardMenuUpdateEventHandler(Observer): self._menu_controller.show_message("Loading failed!") self.context_stack = [] - self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, - transition_attributes=self._menu_renderer_config. - transition_attributes_right) + self._menu_controller.segue( + CtrlBoardMenuBuilder.SCSI_ID_MENU, + transition_attributes=self._menu_renderer_config.transition_attributes_right, + ) # noinspection PyUnusedLocal def handle_action_menu_shutdown(self, info_object): """Method handles the rotary button press on 'Shutdown' in the action menu.""" self.ractl_cmd.shutdown("system") self._menu_controller.show_message("Shutting down!", 150) - self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, - transition_attributes=self._menu_renderer_config. - transition_attributes_right) + self._menu_controller.segue( + CtrlBoardMenuBuilder.SCSI_ID_MENU, + transition_attributes=self._menu_renderer_config.transition_attributes_right, + ) # noinspection PyUnusedLocal def handle_images_menu_return(self, info_object): """Method handles the rotary button press on 'Return' in the image selection menu (through attach/insert).""" context_object = self.context_stack.pop() - self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU, - context_object=context_object, - transition_attributes=self._menu_renderer_config. - transition_attributes_right) + self._menu_controller.segue( + CtrlBoardMenuBuilder.ACTION_MENU, + context_object=context_object, + transition_attributes=self._menu_renderer_config.transition_attributes_right, + ) def handle_images_menu_image_attachinsert(self, info_object): """Method handles the rotary button press on an image in the image selection menu @@ -215,10 +235,11 @@ class CtrlBoardMenuUpdateEventHandler(Observer): context_object = self._menu_controller.get_active_menu().context_object self.attach_insert_scsi_id(info_object) self.context_stack = [] - self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, - context_object=context_object, - transition_attributes=self._menu_renderer_config. - transition_attributes_right) + self._menu_controller.segue( + CtrlBoardMenuBuilder.SCSI_ID_MENU, + context_object=context_object, + transition_attributes=self._menu_renderer_config.transition_attributes_right, + ) def attach_insert_scsi_id(self, info_object): """Helper method to attach/insert an image on a scsi id given through the menu context""" @@ -227,9 +248,9 @@ class CtrlBoardMenuUpdateEventHandler(Observer): context_object = self._menu_controller.get_active_menu().context_object scsi_id = context_object["scsi_id"] params = {"file": image_name} - result = self.ractl_cmd.attach_device(scsi_id=scsi_id, - device_type=device_type, - params=params) + result = self.ractl_cmd.attach_device( + scsi_id=scsi_id, device_type=device_type, params=params + ) if result["status"] is False: self._menu_controller.show_message("Attach failed!") @@ -268,7 +289,10 @@ class CtrlBoardMenuUpdateEventHandler(Observer): self._menu_controller.show_message("Detach failed!") else: log = logging.getLogger(__name__) - log.info("Device type '%s' currently unsupported for detach/eject!", str(device_type)) + log.info( + "Device type '%s' currently unsupported for detach/eject!", + str(device_type), + ) def show_id_action_message(self, scsi_id, action: str): """Helper method for displaying an action message in the case of an exception.""" diff --git a/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_print_event_handler.py b/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_print_event_handler.py index 5992d383..dd5f080d 100644 --- a/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_print_event_handler.py +++ b/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_print_event_handler.py @@ -8,6 +8,7 @@ from ctrlboard_hw.encoder import Encoder class CtrlBoardPrintEventHandler(observer.Observer): """Class implements a basic event handler that prints button presses from the RaSCSI Control Board hardware.""" + def update(self, updated_object): if isinstance(updated_object, HardwareButton): print(updated_object.name + " has been pressed!") diff --git a/python/ctrlboard/src/ctrlboard_event_handler/rascsi_shutdown_cycler.py b/python/ctrlboard/src/ctrlboard_event_handler/rascsi_shutdown_cycler.py index 5c772195..6a31623d 100644 --- a/python/ctrlboard/src/ctrlboard_event_handler/rascsi_shutdown_cycler.py +++ b/python/ctrlboard/src/ctrlboard_event_handler/rascsi_shutdown_cycler.py @@ -6,8 +6,13 @@ class RascsiShutdownCycler(Cycler): """Class implementing the shutdown cycler for the RaSCSI Control Board UI""" def __init__(self, menu_controller, sock_cmd, ractl_cmd): - super().__init__(menu_controller, sock_cmd, ractl_cmd, return_entry=True, - empty_messages=False) + super().__init__( + menu_controller, + sock_cmd, + ractl_cmd, + return_entry=True, + empty_messages=False, + ) self.executed_once = False def populate_cycle_entries(self): diff --git a/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw.py b/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw.py index e5486feb..e561a5fd 100644 --- a/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw.py +++ b/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw.py @@ -16,6 +16,7 @@ from observable import Observable # pylint: disable=too-many-instance-attributes class CtrlBoardHardware(Observable): """Class implements the RaSCSI Control Board hardware and provides an interface to it.""" + def __init__(self, display_i2c_address, pca9554_i2c_address, debounce_ms=200): self.display_i2c_address = display_i2c_address self.pca9554_i2c_address = pca9554_i2c_address @@ -33,63 +34,78 @@ class CtrlBoardHardware(Observable): self.pca_driver = pca9554multiplexer.PCA9554Multiplexer(self.pca9554_i2c_address) # setup pca9554 - self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. - PCA9554_PIN_ENC_A, - PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) - self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. - PCA9554_PIN_ENC_B, - PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) - self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. - PCA9554_PIN_BUTTON_1, - PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) - self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. - PCA9554_PIN_BUTTON_2, - PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) - self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. - PCA9554_PIN_BUTTON_ROTARY, - PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) - self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. - PCA9554_PIN_LED_1, - PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT) - self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. - PCA9554_PIN_LED_2, - PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT) + self.pca_driver.write_configuration_register_port( + CtrlBoardHardwareConstants.PCA9554_PIN_ENC_A, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT, + ) + self.pca_driver.write_configuration_register_port( + CtrlBoardHardwareConstants.PCA9554_PIN_ENC_B, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT, + ) + self.pca_driver.write_configuration_register_port( + CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_1, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT, + ) + self.pca_driver.write_configuration_register_port( + CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_2, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT, + ) + self.pca_driver.write_configuration_register_port( + CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_ROTARY, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT, + ) + self.pca_driver.write_configuration_register_port( + CtrlBoardHardwareConstants.PCA9554_PIN_LED_1, + PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT, + ) + self.pca_driver.write_configuration_register_port( + CtrlBoardHardwareConstants.PCA9554_PIN_LED_2, + PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT, + ) self.input_register_buffer = 0 # pylint: disable=no-member GPIO.setmode(GPIO.BCM) GPIO.setup(CtrlBoardHardwareConstants.PI_PIN_INTERRUPT, GPIO.IN) - GPIO.add_event_detect(CtrlBoardHardwareConstants.PI_PIN_INTERRUPT, GPIO.FALLING, - callback=self.button_pressed_callback) + GPIO.add_event_detect( + CtrlBoardHardwareConstants.PI_PIN_INTERRUPT, + GPIO.FALLING, + callback=self.button_pressed_callback, + ) # configure button of the rotary encoder - self.rotary_button = HardwareButton(self.pca_driver, - CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_ROTARY) + self.rotary_button = HardwareButton( + self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_ROTARY + ) self.rotary_button.state = True self.rotary_button.name = CtrlBoardHardwareConstants.ROTARY_BUTTON # configure button 1 - self.button1 = HardwareButton(self.pca_driver, - CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_1) + self.button1 = HardwareButton( + self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_1 + ) self.button1.state = True self.button1.name = CtrlBoardHardwareConstants.BUTTON_1 # configure button 2 - self.button2 = HardwareButton(self.pca_driver, - CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_2) + self.button2 = HardwareButton( + self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_2 + ) self.button2.state = True self.button2.name = CtrlBoardHardwareConstants.BUTTON_2 # configure rotary encoder pin a - self.rotary_a = HardwareButton(self.pca_driver, - CtrlBoardHardwareConstants.PCA9554_PIN_ENC_A) + self.rotary_a = HardwareButton( + self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_ENC_A + ) self.rotary_a.state = True self.rotary_a.directionalTransition = False self.rotary_a.name = CtrlBoardHardwareConstants.ROTARY_A # configure rotary encoder pin b - self.rotary_b = HardwareButton(self.pca_driver, - CtrlBoardHardwareConstants.PCA9554_PIN_ENC_B) + self.rotary_b = HardwareButton( + self.pca_driver, CtrlBoardHardwareConstants.PCA9554_PIN_ENC_B + ) self.rotary_b.state = True self.rotary_b.directionalTransition = False self.rotary_b.name = CtrlBoardHardwareConstants.ROTARY_B @@ -117,7 +133,7 @@ class CtrlBoardHardware(Observable): # ignore button press if debounce time is not reached if button.last_press is not None: - elapsed = (time.time_ns() - button.last_press)/1000000 + elapsed = (time.time_ns() - button.last_press) / 1000000 if elapsed < self.debounce_ms: return @@ -149,8 +165,8 @@ class CtrlBoardHardware(Observable): @staticmethod def button_value_shifted_list(input_register_buffer, bit): """Helper method for dealing with multiple buffered input registers""" - input_register_buffer_length = int(len(format(input_register_buffer, 'b'))/8) - shiftval = (input_register_buffer_length-1)*8 + input_register_buffer_length = int(len(format(input_register_buffer, "b")) / 8) + shiftval = (input_register_buffer_length - 1) * 8 tmp = input_register_buffer >> shiftval bitmask = 1 << bit tmp &= bitmask @@ -162,12 +178,12 @@ class CtrlBoardHardware(Observable): input_register_buffer = self.input_register_buffer self.input_register_buffer = 0 - input_register_buffer_length = int(len(format(input_register_buffer, 'b'))/8) + input_register_buffer_length = int(len(format(input_register_buffer, "b")) / 8) if input_register_buffer_length < 1: return for i in range(0, input_register_buffer_length): - shiftval = (input_register_buffer_length-1-i)*8 + shiftval = (input_register_buffer_length - 1 - i) * 8 input_register = (input_register_buffer >> shiftval) & 0b11111111 rot_a = self.button_value(input_register, 0) @@ -211,7 +227,7 @@ class CtrlBoardHardware(Observable): if 2 < _address < 120: try: _bus.read_byte(_address) - address = '%02x' % _address + address = "%02x" % _address detected_i2c_addresses.append(int(address, base=16)) except IOError: # simply skip unsuccessful i2c probes pass @@ -223,8 +239,12 @@ class CtrlBoardHardware(Observable): the expected i2c addresses are detected.""" # pylint: disable=c-extension-no-member i2c_addresses = self.detect_i2c_devices(smbus.SMBus(1)) - return bool((int(self.display_i2c_address) in i2c_addresses and - (int(self.pca9554_i2c_address) in i2c_addresses))) + return bool( + ( + int(self.display_i2c_address) in i2c_addresses + and (int(self.pca9554_i2c_address) in i2c_addresses) + ) + ) def detect_display(self): """Detects whether an i2c display is connected to the RaSCSI hat.""" diff --git a/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw_constants.py b/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw_constants.py index d1c4606e..888f3053 100644 --- a/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw_constants.py +++ b/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw_constants.py @@ -4,8 +4,9 @@ # pylint: disable=too-few-public-methods class CtrlBoardHardwareConstants: """Class containing the RaSCSI Control Board hardware constants""" - DISPLAY_I2C_ADDRESS = 0x3c - PCA9554_I2C_ADDRESS = 0x3f + + DISPLAY_I2C_ADDRESS = 0x3C + PCA9554_I2C_ADDRESS = 0x3F PCA9554_PIN_ENC_A = 0 PCA9554_PIN_ENC_B = 1 PCA9554_PIN_BUTTON_1 = 2 @@ -14,7 +15,7 @@ class CtrlBoardHardwareConstants: PCA9554_PIN_LED_1 = 6 PCA9554_PIN_LED_2 = 7 - PI_PIN_INTERRUPT = 9 # BCM + PI_PIN_INTERRUPT = 9 # BCM BUTTON_1 = "Bt1" BUTTON_2 = "Bt2" diff --git a/python/ctrlboard/src/ctrlboard_hw/encoder.py b/python/ctrlboard/src/ctrlboard_hw/encoder.py index f5568f33..18166034 100644 --- a/python/ctrlboard/src/ctrlboard_hw/encoder.py +++ b/python/ctrlboard/src/ctrlboard_hw/encoder.py @@ -6,6 +6,7 @@ from ctrlboard_hw.hardware_button import HardwareButton class Encoder: """Class implementing a detection mechanism to detect the rotary encoder directions through the i2c multiplexer + interrupt""" + def __init__(self, enc_a: HardwareButton, enc_b: HardwareButton): self.enc_a = enc_a self.enc_b = enc_b @@ -27,16 +28,27 @@ class Encoder: state |= 0b10000000 # clockwise pattern detection - if (state == 0b11010010 or state == 0b11001000 or state == 0b11011000 or - state == 0b11010001 or state == 0b11011011 or state == 0b11100000 or - state == 0b11001011): + if ( + state == 0b11010010 + or state == 0b11001000 + or state == 0b11011000 + or state == 0b11010001 + or state == 0b11011011 + or state == 0b11100000 + or state == 0b11001011 + ): self.pos += 1 self.direction = 1 self.state = 0b00000000 return # counter-clockwise pattern detection - elif (state == 0b11000100 or state == 0b11100100 or state == 0b11100001 or - state == 0b11000111 or state == 0b11100111): + elif ( + state == 0b11000100 + or state == 0b11100100 + or state == 0b11100001 + or state == 0b11000111 + or state == 0b11100111 + ): self.pos -= 1 self.direction = -1 self.state = 0b00000000 diff --git a/python/ctrlboard/src/ctrlboard_hw/pca9554multiplexer.py b/python/ctrlboard/src/ctrlboard_hw/pca9554multiplexer.py index bf25fad6..e03da186 100644 --- a/python/ctrlboard/src/ctrlboard_hw/pca9554multiplexer.py +++ b/python/ctrlboard/src/ctrlboard_hw/pca9554multiplexer.py @@ -19,8 +19,10 @@ class PCA9554Multiplexer: try: self.i2c_bus = smbus.SMBus(1) if self.read_input_register() is None: - logging.error("PCA9554 initialization test on specified i2c address %s failed", - self.i2c_address) + logging.error( + "PCA9554 initialization test on specified i2c address %s failed", + self.i2c_address, + ) self.i2c_bus = None except IOError: logging.error("Could not open the i2c bus.") @@ -35,8 +37,9 @@ class PCA9554Multiplexer: if bit_value: updated_configuration_register = configuration_register | (1 << port_bit) else: - updated_configuration_register = configuration_register & (0xFF - - (1 << port_bit)) + updated_configuration_register = configuration_register & ( + 0xFF - (1 << port_bit) + ) self.i2c_bus.write_byte_data(self.i2c_address, 3, updated_configuration_register) return True diff --git a/python/ctrlboard/src/ctrlboard_menu_builder.py b/python/ctrlboard/src/ctrlboard_menu_builder.py index 160eb2bd..95962942 100644 --- a/python/ctrlboard/src/ctrlboard_menu_builder.py +++ b/python/ctrlboard/src/ctrlboard_menu_builder.py @@ -9,6 +9,7 @@ from rascsi.ractl_cmds import RaCtlCmds class CtrlBoardMenuBuilder(MenuBuilder): """Class fgor building the control board UI specific menus""" + SCSI_ID_MENU = "scsi_id_menu" ACTION_MENU = "action_menu" IMAGES_MENU = "images_menu" @@ -27,8 +28,12 @@ class CtrlBoardMenuBuilder(MenuBuilder): def __init__(self, ractl_cmd: RaCtlCmds): super().__init__() self._rascsi_client = ractl_cmd - self.file_cmd = FileCmds(sock_cmd=ractl_cmd.sock_cmd, ractl=ractl_cmd, - token=ractl_cmd.token, locale=ractl_cmd.locale) + self.file_cmd = FileCmds( + sock_cmd=ractl_cmd.sock_cmd, + ractl=ractl_cmd, + token=ractl_cmd.token, + locale=ractl_cmd.locale, + ) def build(self, name: str, context_object=None) -> Menu: if name == CtrlBoardMenuBuilder.SCSI_ID_MENU: @@ -93,9 +98,14 @@ class CtrlBoardMenuBuilder(MenuBuilder): if device_type != "": menu_str += " [" + device_type + "]" - menu.add_entry(menu_str, {"context": self.SCSI_ID_MENU, - "action": self.ACTION_OPENACTIONMENU, - "scsi_id": scsi_id}) + menu.add_entry( + menu_str, + { + "context": self.SCSI_ID_MENU, + "action": self.ACTION_OPENACTIONMENU, + "scsi_id": scsi_id, + }, + ) return menu @@ -103,18 +113,30 @@ class CtrlBoardMenuBuilder(MenuBuilder): def create_action_menu(self, context_object=None): """Method creates the action submenu with action that can be performed on a scsi slot""" menu = Menu(CtrlBoardMenuBuilder.ACTION_MENU) - menu.add_entry("Return", {"context": self.ACTION_MENU, - "action": self.ACTION_RETURN}) - menu.add_entry("Attach/Insert", {"context": self.ACTION_MENU, - "action": self.ACTION_SLOT_ATTACHINSERT}) - menu.add_entry("Detach/Eject", {"context": self.ACTION_MENU, - "action": self.ACTION_SLOT_DETACHEJECT}) - menu.add_entry("Info", {"context": self.ACTION_MENU, - "action": self.ACTION_SLOT_INFO}) - menu.add_entry("Load Profile", {"context": self.ACTION_MENU, - "action": self.ACTION_LOADPROFILE}) - menu.add_entry("Shutdown", {"context": self.ACTION_MENU, - "action": self.ACTION_SHUTDOWN}) + menu.add_entry( + "Return", + {"context": self.ACTION_MENU, "action": self.ACTION_RETURN}, + ) + menu.add_entry( + "Attach/Insert", + {"context": self.ACTION_MENU, "action": self.ACTION_SLOT_ATTACHINSERT}, + ) + menu.add_entry( + "Detach/Eject", + {"context": self.ACTION_MENU, "action": self.ACTION_SLOT_DETACHEJECT}, + ) + menu.add_entry( + "Info", + {"context": self.ACTION_MENU, "action": self.ACTION_SLOT_INFO}, + ) + menu.add_entry( + "Load Profile", + {"context": self.ACTION_MENU, "action": self.ACTION_LOADPROFILE}, + ) + menu.add_entry( + "Shutdown", + {"context": self.ACTION_MENU, "action": self.ACTION_SHUTDOWN}, + ) return menu def create_images_menu(self, context_object=None): @@ -123,12 +145,15 @@ class CtrlBoardMenuBuilder(MenuBuilder): images_info = self.file_cmd.list_images() menu.add_entry("Return", {"context": self.IMAGES_MENU, "action": self.ACTION_RETURN}) images = images_info["files"] - sorted_images = sorted(images, key=lambda d: d['name']) + sorted_images = sorted(images, key=lambda d: d["name"]) for image in sorted_images: image_str = image["name"] + " [" + image["detected_type"] + "]" - image_context = {"context": self.IMAGES_MENU, "name": str(image["name"]), - "device_type": str(image["detected_type"]), - "action": self.ACTION_IMAGE_ATTACHINSERT} + image_context = { + "context": self.IMAGES_MENU, + "name": str(image["name"]), + "device_type": str(image["detected_type"]), + "action": self.ACTION_IMAGE_ATTACHINSERT, + } menu.add_entry(image_str, image_context) return menu @@ -138,9 +163,14 @@ class CtrlBoardMenuBuilder(MenuBuilder): menu.add_entry("Return", {"context": self.IMAGES_MENU, "action": self.ACTION_RETURN}) config_files = self.file_cmd.list_config_files() for config_file in config_files: - menu.add_entry(str(config_file), - {"context": self.PROFILES_MENU, "name": str(config_file), - "action": self.ACTION_LOADPROFILE}) + menu.add_entry( + str(config_file), + { + "context": self.PROFILES_MENU, + "name": str(config_file), + "action": self.ACTION_LOADPROFILE, + }, + ) return menu diff --git a/python/ctrlboard/src/main.py b/python/ctrlboard/src/main.py index 403a7923..c9ceb333 100644 --- a/python/ctrlboard/src/main.py +++ b/python/ctrlboard/src/main.py @@ -6,14 +6,17 @@ import logging from config import CtrlboardConfig from ctrlboard_hw.ctrlboard_hw import CtrlBoardHardware from ctrlboard_hw.ctrlboard_hw_constants import CtrlBoardHardwareConstants -from ctrlboard_event_handler.ctrlboard_menu_update_event_handler \ - import CtrlBoardMenuUpdateEventHandler +from ctrlboard_event_handler.ctrlboard_menu_update_event_handler import ( + CtrlBoardMenuUpdateEventHandler, +) from ctrlboard_menu_builder import CtrlBoardMenuBuilder from menu.menu_renderer_config import MenuRendererConfig from menu.menu_renderer_luma_oled import MenuRendererLumaOled -from rascsi.exceptions import (EmptySocketChunkException, - InvalidProtobufResponse, - FailedSocketConnectionException) +from rascsi.exceptions import ( + EmptySocketChunkException, + InvalidProtobufResponse, + FailedSocketConnectionException, +) from rascsi.ractl_cmds import RaCtlCmds from rascsi.socket_cmds import SocketCmds @@ -23,7 +26,7 @@ from rascsi_menu_controller import RascsiMenuController def parse_config(): """Parses the command line parameters and configured the RaSCSI Control Board UI accordingly""" config = CtrlboardConfig() - cmdline_args_parser = argparse.ArgumentParser(description='RaSCSI ctrlboard service') + cmdline_args_parser = argparse.ArgumentParser(description="RaSCSI ctrlboard service") cmdline_args_parser.add_argument( "--rotation", type=int, @@ -68,7 +71,7 @@ def parse_config(): default=logging.WARN, action="store", help="Loglevel. Valid values: 0 (notset), 10 (debug), 30 (warning), " - "40 (error), 50 (critical). Default: Warning", + "40 (error), 50 (critical). Default: Warning", ) cmdline_args_parser.add_argument( "--transitions", @@ -115,14 +118,14 @@ def main(): config = parse_config() log_format = "%(asctime)s:%(name)s:%(levelname)s - %(message)s" - logging.basicConfig(stream=sys.stdout, - format=log_format, - level=config.LOG_LEVEL) + logging.basicConfig(stream=sys.stdout, format=log_format, level=config.LOG_LEVEL) log = logging.getLogger(__name__) log.debug("RaSCSI ctrlboard service started.") - ctrlboard_hw = CtrlBoardHardware(display_i2c_address=config.DISPLAY_I2C_ADDRESS, - pca9554_i2c_address=config.PCA9554_I2C_ADDRESS) + ctrlboard_hw = CtrlBoardHardware( + display_i2c_address=config.DISPLAY_I2C_ADDRESS, + pca9554_i2c_address=config.PCA9554_I2C_ADDRESS, + ) # for now, we require the complete rascsi ctrlboard hardware. # Oled only will be supported as well at some later point in time. @@ -143,8 +146,10 @@ def main(): exit(1) if check_rascsi_connection(ractl_cmd) is False: - log.error("Communication with RaSCSI failed. Please check if password token must be set " - "and whether is set correctly.") + log.error( + "Communication with RaSCSI failed. Please check if password token must be set " + "and whether is set correctly." + ) exit(1) menu_renderer_config = MenuRendererConfig() @@ -156,18 +161,21 @@ def main(): menu_renderer_config.rotation = config.ROTATION menu_builder = CtrlBoardMenuBuilder(ractl_cmd) - menu_controller = RascsiMenuController(config.MENU_REFRESH_INTERVAL, menu_builder=menu_builder, - menu_renderer=MenuRendererLumaOled(menu_renderer_config), - menu_renderer_config=menu_renderer_config) + menu_controller = RascsiMenuController( + config.MENU_REFRESH_INTERVAL, + menu_builder=menu_builder, + menu_renderer=MenuRendererLumaOled(menu_renderer_config), + menu_renderer_config=menu_renderer_config, + ) menu_controller.add(CtrlBoardMenuBuilder.SCSI_ID_MENU) menu_controller.add(CtrlBoardMenuBuilder.ACTION_MENU) - menu_controller.show_splash_screen(f"resources/splash_start_64.bmp") + menu_controller.show_splash_screen("resources/splash_start_64.bmp") - menu_update_event_handler = CtrlBoardMenuUpdateEventHandler(menu_controller, - sock_cmd=sock_cmd, - ractl_cmd=ractl_cmd) + menu_update_event_handler = CtrlBoardMenuUpdateEventHandler( + menu_controller, sock_cmd=sock_cmd, ractl_cmd=ractl_cmd + ) ctrlboard_hw.attach(menu_update_event_handler) menu_controller.set_active_menu(CtrlBoardMenuBuilder.SCSI_ID_MENU) @@ -184,5 +192,5 @@ def main(): print(ex) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/python/ctrlboard/src/menu/blank_screensaver.py b/python/ctrlboard/src/menu/blank_screensaver.py index 3f253f18..bc55eaca 100644 --- a/python/ctrlboard/src/menu/blank_screensaver.py +++ b/python/ctrlboard/src/menu/blank_screensaver.py @@ -5,6 +5,7 @@ from menu.screensaver import ScreenSaver class BlankScreenSaver(ScreenSaver): """Class implementing a blank screen safer that simply blanks the screen after a configured activation delay""" + def __init__(self, activation_delay, menu_renderer): super().__init__(activation_delay, menu_renderer) self._initial_draw_call = None diff --git a/python/ctrlboard/src/menu/cycler.py b/python/ctrlboard/src/menu/cycler.py index f1744227..c0249a7a 100644 --- a/python/ctrlboard/src/menu/cycler.py +++ b/python/ctrlboard/src/menu/cycler.py @@ -8,9 +8,17 @@ class Cycler: """Class implementing button cycling functionality. Message is shown at the center of the screen where repeated button presses cycle through the available selection possibilities. Inactivity (cycle_timeout) actives cycle entry last shown on the screen.""" - def __init__(self, menu_controller, sock_cmd, ractl_cmd, - cycle_timeout=3, return_string="Return ->", - return_entry=True, empty_messages=True): + + def __init__( + self, + menu_controller, + sock_cmd, + ractl_cmd, + cycle_timeout=3, + return_string="Return ->", + return_entry=True, + empty_messages=True, + ): self._cycle_profile_timer_flag = Timer(activation_delay=cycle_timeout) self._menu_controller = menu_controller self.sock_cmd = sock_cmd @@ -39,7 +47,7 @@ class Cycler: """Perform the return action, i.e., when no selection is chosen""" def update(self): - """ Returns True if object has completed its task and can be deleted """ + """Returns True if object has completed its task and can be deleted""" if self._cycle_profile_timer_flag is None: return None diff --git a/python/ctrlboard/src/menu/menu.py b/python/ctrlboard/src/menu/menu.py index 17bbdafb..ba465373 100644 --- a/python/ctrlboard/src/menu/menu.py +++ b/python/ctrlboard/src/menu/menu.py @@ -4,6 +4,7 @@ from typing import List class Menu: """Class implement the Menu class""" + def __init__(self, name: str): self.entries: List = [] self.item_selection = 0 @@ -17,11 +18,11 @@ class Menu: def get_current_text(self): """Returns the text content of the currently selected text in the menu.""" - return self.entries[self.item_selection]['text'] + return self.entries[self.item_selection]["text"] def get_current_info_object(self): """Returns the data object to the currently selected menu item""" - return self.entries[self.item_selection]['data_object'] + return self.entries[self.item_selection]["data_object"] def __repr__(self): print("entries: " + str(self.entries)) diff --git a/python/ctrlboard/src/menu/menu_builder.py b/python/ctrlboard/src/menu/menu_builder.py index 6bf680e4..ce2f0ca2 100644 --- a/python/ctrlboard/src/menu/menu_builder.py +++ b/python/ctrlboard/src/menu/menu_builder.py @@ -6,6 +6,7 @@ from menu.menu import Menu # pylint: disable=too-few-public-methods class MenuBuilder(ABC): """Base class for menu builders""" + def __init__(self): pass diff --git a/python/ctrlboard/src/menu/menu_renderer.py b/python/ctrlboard/src/menu/menu_renderer.py index 8f7c17bf..380fe401 100644 --- a/python/ctrlboard/src/menu/menu_renderer.py +++ b/python/ctrlboard/src/menu/menu_renderer.py @@ -18,6 +18,7 @@ from menu.screensaver import ScreenSaver class MenuRenderer(ABC): """The abstract menu renderer class provides the base for concrete menu renderer classes that implement functionality based on conrete hardware or available APIs.""" + def __init__(self, config: MenuRendererConfig): self.message = "" self.mini_message = "" @@ -25,7 +26,7 @@ class MenuRenderer(ABC): self._config = config self.disp = self.display_init() - self.image = Image.new('1', (self.disp.width, self.disp.height)) + self.image = Image.new("1", (self.disp.width, self.disp.height)) self.draw = ImageDraw.Draw(self.image) self.font = ImageFont.truetype(config.font_path, size=config.font_size) # just a sample text to work with the font height @@ -83,14 +84,21 @@ class MenuRenderer(ABC): def draw_row(self, row_number: int, text: str, selected: bool): """Draws a single row of the menu.""" x_pos = 0 - y_pos = row_number*self.font_height + y_pos = row_number * self.font_height if selected: selection_extension = 0 if row_number < self.rows_per_screen(): selection_extension = self._config.row_selection_pixel_extension - self.draw.rectangle((x_pos, y_pos, self.disp.width, - y_pos+self._config.font_size+selection_extension), - outline=0, fill=255) + self.draw.rectangle( + ( + x_pos, + y_pos, + self.disp.width, + y_pos + self._config.font_size + selection_extension, + ), + outline=0, + fill=255, + ) # in stage 1, we initialize scrolling for the currently selected line if self._perform_scrolling_stage == 1: @@ -99,9 +107,9 @@ class MenuRenderer(ABC): # in stage 2, we know the details and can thus perform the scrolling to the left if self._perform_scrolling_stage == 2: - if self._current_line_horizontal_overlap+self._x_scrolling > 0: + if self._current_line_horizontal_overlap + self._x_scrolling > 0: self._x_scrolling -= 1 - if self._current_line_horizontal_overlap+self._x_scrolling == 0: + if self._current_line_horizontal_overlap + self._x_scrolling == 0: self._stage_timestamp = int(time.time()) self._perform_scrolling_stage = 3 @@ -115,11 +123,12 @@ class MenuRenderer(ABC): # in stage 4, we scroll back to the right if self._perform_scrolling_stage == 4: - if self._current_line_horizontal_overlap+self._x_scrolling >= 0: + if self._current_line_horizontal_overlap + self._x_scrolling >= 0: self._x_scrolling += 1 - if (self._current_line_horizontal_overlap + - self._x_scrolling) == self._current_line_horizontal_overlap: + if ( + self._current_line_horizontal_overlap + self._x_scrolling + ) == self._current_line_horizontal_overlap: self._stage_timestamp = int(time.time()) self._perform_scrolling_stage = 5 @@ -131,8 +140,14 @@ class MenuRenderer(ABC): self._stage_timestamp = None self._perform_scrolling_stage = 2 - self.draw.text((x_pos+self._x_scrolling, y_pos), text, font=self.font, - spacing=0, stroke_fill=0, fill=0) + self.draw.text( + (x_pos + self._x_scrolling, y_pos), + text, + font=self.font, + spacing=0, + stroke_fill=0, + fill=0, + ) else: self.draw.text((x_pos, y_pos), text, font=self.font, spacing=0, stroke_fill=0, fill=255) @@ -143,8 +158,15 @@ class MenuRenderer(ABC): centered_height = (self.disp.height - font_height) / 2 self.draw.rectangle((0, 0, self.disp.width, self.disp.height), outline=0, fill=255) - self.draw.text((centered_width, centered_height), text, align="center", font=self.font, - stroke_fill=0, fill=0, textsize=20) + self.draw.text( + (centered_width, centered_height), + text, + align="center", + font=self.font, + stroke_fill=0, + fill=0, + textsize=20, + ) def draw_mini_message(self, text: str): """Draws a fullscreen message, i.e., a message covering only the center portion of @@ -153,16 +175,33 @@ class MenuRenderer(ABC): centered_width = (self.disp.width - font_width) / 2 centered_height = (self.disp.height - self.font_height) / 2 - self.draw.rectangle((0, centered_height-4, self.disp.width, - centered_height+self.font_height+4), outline=0, fill=255) - self.draw.text((centered_width, centered_height), text, align="center", font=self.font, - stroke_fill=0, fill=0, textsize=20) + self.draw.rectangle( + ( + 0, + centered_height - 4, + self.disp.width, + centered_height + self.font_height + 4, + ), + outline=0, + fill=255, + ) + self.draw.text( + (centered_width, centered_height), + text, + align="center", + font=self.font, + stroke_fill=0, + fill=0, + textsize=20, + ) def draw_menu(self): """Method draws the menu set to the class instance.""" if self._menu.item_selection >= self.frame_start_row + self.rows_per_screen(): if self._config.scroll_behavior == "page": - self.frame_start_row = self.frame_start_row + (round(self.rows_per_screen()/2)) + 1 + self.frame_start_row = ( + self.frame_start_row + (round(self.rows_per_screen() / 2)) + 1 + ) if self.frame_start_row > len(self._menu.entries) - self.rows_per_screen(): self.frame_start_row = len(self._menu.entries) - self.rows_per_screen() else: # extend as default behavior @@ -170,13 +209,15 @@ class MenuRenderer(ABC): if self._menu.item_selection < self.frame_start_row: if self._config.scroll_behavior == "page": - self.frame_start_row = self.frame_start_row - (round(self.rows_per_screen()/2)) - 1 + self.frame_start_row = ( + self.frame_start_row - (round(self.rows_per_screen() / 2)) - 1 + ) if self.frame_start_row < 0: self.frame_start_row = 0 else: # extend as default behavior self.frame_start_row = self._menu.item_selection - self.draw_menu_frame(self.frame_start_row, self.frame_start_row+self.rows_per_screen()) + self.draw_menu_frame(self.frame_start_row, self.frame_start_row + self.rows_per_screen()) def draw_menu_frame(self, frame_start_row: int, frame_end_row: int): """Draws row frame_start_row to frame_end_row of the class instance menu, i.e., it diff --git a/python/ctrlboard/src/menu/menu_renderer_adafruit_ssd1306.py b/python/ctrlboard/src/menu/menu_renderer_adafruit_ssd1306.py index d5d4c2c8..73646fc0 100644 --- a/python/ctrlboard/src/menu/menu_renderer_adafruit_ssd1306.py +++ b/python/ctrlboard/src/menu/menu_renderer_adafruit_ssd1306.py @@ -11,8 +11,9 @@ class MenuRendererAdafruitSSD1306(MenuRenderer): def display_init(self): i2c = busio.I2C(SCL, SDA) - self.disp = adafruit_ssd1306.SSD1306_I2C(self._config.width, self._config.height, i2c, - addr=self._config.i2c_address) + self.disp = adafruit_ssd1306.SSD1306_I2C( + self._config.width, self._config.height, i2c, addr=self._config.i2c_address + ) self.disp.rotation = self._config.get_mapped_rotation() self.disp.fill(0) self.disp.show() diff --git a/python/ctrlboard/src/menu/menu_renderer_config.py b/python/ctrlboard/src/menu/menu_renderer_config.py index 0d12e72c..6d44c866 100644 --- a/python/ctrlboard/src/menu/menu_renderer_config.py +++ b/python/ctrlboard/src/menu/menu_renderer_config.py @@ -5,20 +5,16 @@ class MenuRendererConfig: """Class for configuring menu renderer instances. Provides configuration options such as width, height, i2c address, font, transitions, etc.""" - _rotation_mapper = { - 0: 0, - 90: 1, - 180: 2, - 270: 3 - } + + _rotation_mapper = {0: 0, 90: 1, 180: 2, 270: 3} def __init__(self): self.width = 128 self.height = 64 - self.i2c_address = 0x3c + self.i2c_address = 0x3C self.i2c_port = 1 self.display_type = "ssd1306" # luma-oled supported devices, "sh1106", "ssd1306", ... - self.font_path = 'resources/DejaVuSansMono-Bold.ttf' + self.font_path = "resources/DejaVuSansMono-Bold.ttf" self.font_size = 12 self.row_selection_pixel_extension = 2 self.scroll_behavior = "page" # "extend" or "page" diff --git a/python/ctrlboard/src/menu/menu_renderer_luma_oled.py b/python/ctrlboard/src/menu/menu_renderer_luma_oled.py index e6c30b53..eb29f6ac 100644 --- a/python/ctrlboard/src/menu/menu_renderer_luma_oled.py +++ b/python/ctrlboard/src/menu/menu_renderer_luma_oled.py @@ -5,14 +5,19 @@ from menu.menu_renderer import MenuRenderer class MenuRendererLumaOled(MenuRenderer): """Class implementing the luma oled menu renderer""" + def display_init(self): serial = i2c(port=self._config.i2c_port, address=self._config.i2c_address) import luma.oled.device + device = getattr(luma.oled.device, self._config.display_type) - self.disp = device(serial_interface=serial, width=self._config.width, - height=self._config.height, - rotate=self._config.get_mapped_rotation()) + self.disp = device( + serial_interface=serial, + width=self._config.width, + height=self._config.height, + rotate=self._config.get_mapped_rotation(), + ) self.disp.clear() self.disp.show() diff --git a/python/ctrlboard/src/menu/timer.py b/python/ctrlboard/src/menu/timer.py index 3d41cc6b..9335ecc2 100644 --- a/python/ctrlboard/src/menu/timer.py +++ b/python/ctrlboard/src/menu/timer.py @@ -5,6 +5,7 @@ import time class Timer: """Class implementing a timer class. Takes an activation delay and sets a flag if the activation delay exprires.""" + def __init__(self, activation_delay): self.start_timestamp = int(time.time()) self.activation_delay = activation_delay diff --git a/python/ctrlboard/src/menu/transition.py b/python/ctrlboard/src/menu/transition.py index a61df0a5..d6fbe7ec 100644 --- a/python/ctrlboard/src/menu/transition.py +++ b/python/ctrlboard/src/menu/transition.py @@ -17,6 +17,7 @@ class Transition: class PushTransition(Transition): """Class implementing a push left/right transition.""" + PUSH_LEFT_TRANSITION = "push_left" PUSH_RIGHT_TRANSITION = "push_right" @@ -32,7 +33,7 @@ class PushTransition(Transition): if transition_attributes is not None and transition_attributes != {}: direction = transition_attributes["direction"] - transition_image = Image.new('1', (self.disp.width, self.disp.height)) + transition_image = Image.new("1", (self.disp.width, self.disp.height)) if direction == PushTransition.PUSH_LEFT_TRANSITION: self.perform_push_left(end_image, start_image, transition_image) @@ -57,8 +58,8 @@ class PushTransition(Transition): """Implements a push right transition. Is called by perform depending on the transition attribute 'direction'.""" for x_pos in range(0, 128, self.transition_attributes["transition_speed"]): - left_region = start_image.crop((0, 0, 128-x_pos, 64)) - right_region = end_image.crop((128-x_pos, 0, 128, 64)) + left_region = start_image.crop((0, 0, 128 - x_pos, 64)) + right_region = end_image.crop((128 - x_pos, 0, 128, 64)) transition_image.paste(left_region, (x_pos, 0, 128, 64)) transition_image.paste(right_region, (0, 0, x_pos, 64)) self.disp.display(transition_image) diff --git a/python/ctrlboard/src/observable.py b/python/ctrlboard/src/observable.py index 946dd1c5..5da2db18 100644 --- a/python/ctrlboard/src/observable.py +++ b/python/ctrlboard/src/observable.py @@ -5,6 +5,7 @@ from observer import Observer class Observable: """Class implementing the Observable pattern""" + _observers: List[Observer] = [] def attach(self, observer: Observer): diff --git a/python/ctrlboard/src/observer.py b/python/ctrlboard/src/observer.py index 0fda4830..98eab37b 100644 --- a/python/ctrlboard/src/observer.py +++ b/python/ctrlboard/src/observer.py @@ -5,6 +5,7 @@ from abc import ABC, abstractmethod # pylint: disable=too-few-public-methods class Observer(ABC): """Class implementing an abserver""" + @abstractmethod def update(self, updated_object) -> None: """Abstract method for updating an observer. Needs to be implemented by subclasses.""" diff --git a/python/ctrlboard/src/rascsi_menu_controller.py b/python/ctrlboard/src/rascsi_menu_controller.py index 9b0a538d..d16e57b2 100644 --- a/python/ctrlboard/src/rascsi_menu_controller.py +++ b/python/ctrlboard/src/rascsi_menu_controller.py @@ -9,8 +9,13 @@ from menu.timer import Timer class RascsiMenuController(MenuController): """Class implementing a RaSCSI Control Board UI specific menu controller""" - def __init__(self, refresh_interval, menu_builder: MenuBuilder, - menu_renderer=None, menu_renderer_config=None): + def __init__( + self, + refresh_interval, + menu_builder: MenuBuilder, + menu_renderer=None, + menu_renderer_config=None, + ): super().__init__(menu_builder, menu_renderer, menu_renderer_config) self._refresh_interval = refresh_interval self._menu_renderer: MenuRenderer = menu_renderer diff --git a/python/loopback_test/test.py b/python/loopback_test/test.py index 98e9edb6..3e9e33bb 100644 --- a/python/loopback_test/test.py +++ b/python/loopback_test/test.py @@ -66,91 +66,175 @@ rascsi_none = -1 # Matrix showing all of the SCSI signals, along what signal they're looped back to. # dir_ctrl indicates which direction control pin is associated with that output gpio_map = [ - { 'gpio_num': scsi_d0_gpio, 'attached_to': scsi_ack_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_d1_gpio, 'attached_to': scsi_sel_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_d2_gpio, 'attached_to': scsi_atn_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_d3_gpio, 'attached_to': scsi_rst_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_d4_gpio, 'attached_to': scsi_cd_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_d5_gpio, 'attached_to': scsi_io_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_d6_gpio, 'attached_to': scsi_msg_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_d7_gpio, 'attached_to': scsi_req_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_dp_gpio, 'attached_to': scsi_bsy_gpio, 'dir_ctrl': rascsi_dtd_gpio}, - { 'gpio_num': scsi_atn_gpio, 'attached_to': scsi_d2_gpio, 'dir_ctrl': rascsi_ind_gpio}, - { 'gpio_num': scsi_rst_gpio, 'attached_to': scsi_d3_gpio, 'dir_ctrl': rascsi_ind_gpio}, - { 'gpio_num': scsi_ack_gpio, 'attached_to': scsi_d0_gpio, 'dir_ctrl': rascsi_ind_gpio}, - { 'gpio_num': scsi_req_gpio, 'attached_to': scsi_d7_gpio, 'dir_ctrl': rascsi_tad_gpio}, - { 'gpio_num': scsi_msg_gpio, 'attached_to': scsi_d6_gpio, 'dir_ctrl': rascsi_tad_gpio}, - { 'gpio_num': scsi_cd_gpio, 'attached_to': scsi_d4_gpio, 'dir_ctrl': rascsi_tad_gpio}, - { 'gpio_num': scsi_io_gpio, 'attached_to': scsi_d5_gpio, 'dir_ctrl': rascsi_tad_gpio}, - { 'gpio_num': scsi_bsy_gpio, 'attached_to': scsi_dp_gpio, 'dir_ctrl': rascsi_tad_gpio}, - { 'gpio_num': scsi_sel_gpio, 'attached_to': scsi_d1_gpio, 'dir_ctrl': rascsi_ind_gpio}, + { + "gpio_num": scsi_d0_gpio, + "attached_to": scsi_ack_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_d1_gpio, + "attached_to": scsi_sel_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_d2_gpio, + "attached_to": scsi_atn_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_d3_gpio, + "attached_to": scsi_rst_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_d4_gpio, + "attached_to": scsi_cd_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_d5_gpio, + "attached_to": scsi_io_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_d6_gpio, + "attached_to": scsi_msg_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_d7_gpio, + "attached_to": scsi_req_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_dp_gpio, + "attached_to": scsi_bsy_gpio, + "dir_ctrl": rascsi_dtd_gpio, + }, + { + "gpio_num": scsi_atn_gpio, + "attached_to": scsi_d2_gpio, + "dir_ctrl": rascsi_ind_gpio, + }, + { + "gpio_num": scsi_rst_gpio, + "attached_to": scsi_d3_gpio, + "dir_ctrl": rascsi_ind_gpio, + }, + { + "gpio_num": scsi_ack_gpio, + "attached_to": scsi_d0_gpio, + "dir_ctrl": rascsi_ind_gpio, + }, + { + "gpio_num": scsi_req_gpio, + "attached_to": scsi_d7_gpio, + "dir_ctrl": rascsi_tad_gpio, + }, + { + "gpio_num": scsi_msg_gpio, + "attached_to": scsi_d6_gpio, + "dir_ctrl": rascsi_tad_gpio, + }, + { + "gpio_num": scsi_cd_gpio, + "attached_to": scsi_d4_gpio, + "dir_ctrl": rascsi_tad_gpio, + }, + { + "gpio_num": scsi_io_gpio, + "attached_to": scsi_d5_gpio, + "dir_ctrl": rascsi_tad_gpio, + }, + { + "gpio_num": scsi_bsy_gpio, + "attached_to": scsi_dp_gpio, + "dir_ctrl": rascsi_tad_gpio, + }, + { + "gpio_num": scsi_sel_gpio, + "attached_to": scsi_d1_gpio, + "dir_ctrl": rascsi_ind_gpio, + }, ] # List of all of the SCSI signals that is also a dictionary to their human readable name scsi_signals = { - scsi_d0_gpio: 'D0', - scsi_d1_gpio: 'D1', - scsi_d2_gpio: 'D2', - scsi_d3_gpio: 'D3', - scsi_d4_gpio: 'D4', - scsi_d5_gpio: 'D5', - scsi_d6_gpio: 'D6', - scsi_d7_gpio: 'D7', - scsi_dp_gpio: 'DP', - scsi_atn_gpio: 'ATN', - scsi_rst_gpio: 'RST', - scsi_ack_gpio: 'ACK', - scsi_req_gpio: 'REQ', - scsi_msg_gpio: 'MSG', - scsi_cd_gpio: 'CD', - scsi_io_gpio: 'IO', - scsi_bsy_gpio: 'BSY', - scsi_sel_gpio: 'SEL' + scsi_d0_gpio: "D0", + scsi_d1_gpio: "D1", + scsi_d2_gpio: "D2", + scsi_d3_gpio: "D3", + scsi_d4_gpio: "D4", + scsi_d5_gpio: "D5", + scsi_d6_gpio: "D6", + scsi_d7_gpio: "D7", + scsi_dp_gpio: "DP", + scsi_atn_gpio: "ATN", + scsi_rst_gpio: "RST", + scsi_ack_gpio: "ACK", + scsi_req_gpio: "REQ", + scsi_msg_gpio: "MSG", + scsi_cd_gpio: "CD", + scsi_io_gpio: "IO", + scsi_bsy_gpio: "BSY", + scsi_sel_gpio: "SEL", } + # Debug function that just dumps the status of all of the scsi signals to the console def print_all(): for cur_gpio in gpio_map: - print(cur_gpio['name']+"="+str(gpio.input(cur_gpio['gpio_num'])) + " ", end='', flush=True) + print( + cur_gpio["name"] + "=" + str(gpio.input(cur_gpio["gpio_num"])) + " ", + end="", + flush=True, + ) print("") + # Set transceivers IC1 and IC2 to OUTPUT def set_dtd_out(): - gpio.output(rascsi_dtd_gpio,gpio.LOW) + gpio.output(rascsi_dtd_gpio, gpio.LOW) + # Set transceivers IC1 and IC2 to INPUT def set_dtd_in(): - gpio.output(rascsi_dtd_gpio,gpio.HIGH) + gpio.output(rascsi_dtd_gpio, gpio.HIGH) + # Set transceiver IC4 to OUTPUT def set_ind_out(): - gpio.output(rascsi_ind_gpio,gpio.HIGH) + gpio.output(rascsi_ind_gpio, gpio.HIGH) + # Set transceiver IC4 to INPUT def set_ind_in(): - gpio.output(rascsi_ind_gpio,gpio.LOW) + gpio.output(rascsi_ind_gpio, gpio.LOW) + # Set transceiver IC3 to OUTPUT def set_tad_out(): - gpio.output(rascsi_tad_gpio,gpio.HIGH) + gpio.output(rascsi_tad_gpio, gpio.HIGH) + # Set transceiver IC3 to INPUT def set_tad_in(): - gpio.output(rascsi_tad_gpio,gpio.LOW) + gpio.output(rascsi_tad_gpio, gpio.LOW) + # Set the specified transciever to an OUTPUT. All of the other transceivers # will be set to inputs. If a non-existent direction gpio is specified, this # will set all of the transceivers to inputs. def set_output_channel(out_gpio): - if(out_gpio == rascsi_tad_gpio): + if out_gpio == rascsi_tad_gpio: set_tad_out() else: set_tad_in() - if(out_gpio == rascsi_dtd_gpio): + if out_gpio == rascsi_dtd_gpio: set_dtd_out() else: set_dtd_in() - if(out_gpio == rascsi_ind_gpio): + if out_gpio == rascsi_ind_gpio: set_ind_out() else: set_ind_in() @@ -161,11 +245,11 @@ def set_output_channel(out_gpio): def test_gpio_pin(gpio_rec): global err_count - set_output_channel(gpio_rec['dir_ctrl']) + set_output_channel(gpio_rec["dir_ctrl"]) ############################################ # set the test gpio low - gpio.output(gpio_rec['gpio_num'], gpio.LOW) + gpio.output(gpio_rec["gpio_num"], gpio.LOW) time.sleep(pin_settle_delay) @@ -173,18 +257,34 @@ def test_gpio_pin(gpio_rec): for cur_gpio in scsi_signals: # all of the gpios should be high except for the test gpio and the connected gpio cur_val = gpio.input(cur_gpio) - if( cur_gpio == gpio_rec['gpio_num']): - if(cur_val != gpio.LOW): - print("Error: Test commanded GPIO " + scsi_signals[gpio_rec['gpio_num']] + " to be low, but it did not respond") - err_count = err_count+1 - elif (cur_gpio == gpio_rec['attached_to']): - if(cur_val != gpio.LOW): - print("Error: GPIO " + scsi_signals[gpio_rec['gpio_num']] + " should drive " + scsi_signals[gpio_rec['attached_to']] + " low, but did not") - err_count = err_count+1 + if cur_gpio == gpio_rec["gpio_num"]: + if cur_val != gpio.LOW: + print( + "Error: Test commanded GPIO " + + scsi_signals[gpio_rec["gpio_num"]] + + " to be low, but it did not respond" + ) + err_count = err_count + 1 + elif cur_gpio == gpio_rec["attached_to"]: + if cur_val != gpio.LOW: + print( + "Error: GPIO " + + scsi_signals[gpio_rec["gpio_num"]] + + " should drive " + + scsi_signals[gpio_rec["attached_to"]] + + " low, but did not" + ) + err_count = err_count + 1 else: - if(cur_val != gpio.HIGH): - print("Error: GPIO " + scsi_signals[gpio_rec['gpio_num']] + " incorrectly pulled " + scsi_signals[cur_gpio] + " LOW, when it shouldn't have") - err_count = err_count+1 + if cur_val != gpio.HIGH: + print( + "Error: GPIO " + + scsi_signals[gpio_rec["gpio_num"]] + + " incorrectly pulled " + + scsi_signals[cur_gpio] + + " LOW, when it shouldn't have" + ) + err_count = err_count + 1 ############################################ # set the transceivers to input @@ -196,22 +296,31 @@ def test_gpio_pin(gpio_rec): for cur_gpio in scsi_signals: # all of the gpios should be high except for the test gpio cur_val = gpio.input(cur_gpio) - if( cur_gpio == gpio_rec['gpio_num']): - if(cur_val != gpio.LOW): - print("Error: Test commanded GPIO " + scsi_signals[gpio_rec['gpio_num']] + " to be low, but it did not respond") - err_count = err_count+1 + if cur_gpio == gpio_rec["gpio_num"]: + if cur_val != gpio.LOW: + print( + "Error: Test commanded GPIO " + + scsi_signals[gpio_rec["gpio_num"]] + + " to be low, but it did not respond" + ) + err_count = err_count + 1 else: - if(cur_val != gpio.HIGH): - print("Error: GPIO " + scsi_signals[gpio_rec['gpio_num']] + " incorrectly pulled " + scsi_signals[cur_gpio] + " LOW, when it shouldn't have") - err_count = err_count+1 - + if cur_val != gpio.HIGH: + print( + "Error: GPIO " + + scsi_signals[gpio_rec["gpio_num"]] + + " incorrectly pulled " + + scsi_signals[cur_gpio] + + " LOW, when it shouldn't have" + ) + err_count = err_count + 1 # Set the transceiver back to output - set_output_channel(gpio_rec['dir_ctrl']) + set_output_channel(gpio_rec["dir_ctrl"]) ############################################# # set the test gpio high - gpio.output(gpio_rec['gpio_num'], gpio.HIGH) + gpio.output(gpio_rec["gpio_num"], gpio.HIGH) time.sleep(pin_settle_delay) @@ -219,14 +328,24 @@ def test_gpio_pin(gpio_rec): for cur_gpio in scsi_signals: # all of the gpios should be high cur_val = gpio.input(cur_gpio) - if( cur_gpio == gpio_rec['gpio_num']): - if(cur_val != gpio.HIGH): - print("Error: Test commanded GPIO " + scsi_signals[gpio_rec['gpio_num']] + " to be high, but it did not respond") - err_count = err_count+1 + if cur_gpio == gpio_rec["gpio_num"]: + if cur_val != gpio.HIGH: + print( + "Error: Test commanded GPIO " + + scsi_signals[gpio_rec["gpio_num"]] + + " to be high, but it did not respond" + ) + err_count = err_count + 1 else: - if(cur_val != gpio.HIGH): - print("Error: GPIO " + scsi_signals[gpio_rec['gpio_num']] + " incorrectly pulled " + scsi_signals[cur_gpio] + " LOW, when it shouldn't have") - err_count = err_count+1 + if cur_val != gpio.HIGH: + print( + "Error: GPIO " + + scsi_signals[gpio_rec["gpio_num"]] + + " incorrectly pulled " + + scsi_signals[cur_gpio] + + " LOW, when it shouldn't have" + ) + err_count = err_count + 1 # Initialize the GPIO library, set all of the gpios associated with SCSI signals to outputs and set @@ -235,7 +354,7 @@ def setup(): gpio.setmode(gpio.BOARD) gpio.setwarnings(False) for cur_gpio in gpio_map: - gpio.setup(cur_gpio['gpio_num'], gpio.OUT, initial=gpio.HIGH) + gpio.setup(cur_gpio["gpio_num"], gpio.OUT, initial=gpio.HIGH) # Setup direction control gpio.setup(rascsi_ind_gpio, gpio.OUT) @@ -244,7 +363,7 @@ def setup(): # Main functions for running the actual test. -if __name__ == '__main__': +if __name__ == "__main__": # setup the GPIOs setup() # Test each SCSI signal in the gpio_map @@ -252,7 +371,7 @@ if __name__ == '__main__': test_gpio_pin(cur_gpio) # Print the test results - if(err_count == 0): + if err_count == 0: print("-------- Test PASSED --------") else: print("!!!!!!!! Test FAILED !!!!!!!!") diff --git a/python/oled/src/interrupt_handler.py b/python/oled/src/interrupt_handler.py index 89603717..b7b71110 100644 --- a/python/oled/src/interrupt_handler.py +++ b/python/oled/src/interrupt_handler.py @@ -8,6 +8,7 @@ class GracefulInterruptHandler: """ Class for handling Linux signal interrupts """ + def __init__(self, signals=(signal.SIGINT, signal.SIGTERM)): self.signals = signals self.original_handlers = {} diff --git a/python/oled/src/rascsi_oled_monitor.py b/python/oled/src/rascsi_oled_monitor.py index 5cb6adb7..f815d464 100755 --- a/python/oled/src/rascsi_oled_monitor.py +++ b/python/oled/src/rascsi_oled_monitor.py @@ -52,7 +52,7 @@ parser.add_argument( default=180, action="store", help="The rotation of the screen buffer in degrees", - ) +) parser.add_argument( "--height", type=int, @@ -60,7 +60,7 @@ parser.add_argument( default=32, action="store", help="The pixel height of the screen buffer", - ) +) parser.add_argument( "--refresh_interval", type=int, @@ -68,14 +68,14 @@ parser.add_argument( default=1000, action="store", help="Interval in ms between each screen refresh", - ) +) parser.add_argument( "--password", type=str, default="", action="store", help="Token password string for authenticating with the backend", - ) +) parser.add_argument( "--host", type=str, @@ -156,7 +156,7 @@ LINE_SPACING = 8 # When using other fonts, you may need to adjust PADDING, FONT_SIZE, # LINE_SPACING, and LINES. # Some other nice fonts to try: http://www.dafont.com/bitmap.php -FONT = ImageFont.truetype('resources/type_writer.ttf', FONT_SIZE) +FONT = ImageFont.truetype("resources/type_writer.ttf", FONT_SIZE) REMOVABLE_DEVICE_TYPES = ractl_cmd.get_removable_device_types() PERIPHERAL_DEVICE_TYPES = ractl_cmd.get_peripheral_device_types() @@ -176,6 +176,7 @@ DRAW = ImageDraw.Draw(IMAGE) # Draw a black filled box to clear the image. DRAW.rectangle((0, 0, WIDTH, HEIGHT), outline=0, fill=0) + def formatted_output(): """ Formats the strings to be displayed on the Screen @@ -216,6 +217,7 @@ def formatted_output(): output += ["No network connection"] return output + def shutdown(): """ Display the shutdown splash, then blank the screen after a sleep @@ -224,7 +226,7 @@ def shutdown(): OLED.image(IMAGE_STOP) OLED.show() OLED.fill(0) - sleep(700/1000) + sleep(700 / 1000) OLED.show() sys.exit("Shutting down the OLED display...") @@ -264,7 +266,7 @@ with GracefulInterruptHandler() as handler: # Display image. OLED.image(IMAGE) OLED.show() - sleep(args.refresh_interval/1000) + sleep(args.refresh_interval / 1000) snapshot = formatted_output() diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 00000000..aea91ba3 --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,4 @@ +[tool.black] +line-length = 100 +target-version = ['py37', 'py38', 'py39'] +extend-exclude = ".*_pb2.py" diff --git a/python/web/.flake8 b/python/web/.flake8 deleted file mode 100644 index 7da1f960..00000000 --- a/python/web/.flake8 +++ /dev/null @@ -1,2 +0,0 @@ -[flake8] -max-line-length = 100 diff --git a/python/web/pyproject.toml b/python/web/pyproject.toml index 7adcc930..25fc8772 100644 --- a/python/web/pyproject.toml +++ b/python/web/pyproject.toml @@ -1,8 +1,4 @@ [tool.pytest.ini_options] addopts = "--junitxml=report.xml" log_cli = true -log_cli_level = "warn" - -[tool.black] -line-length = 100 -target-version = ['py37', 'py38', 'py39'] +log_cli_level = "warn" \ No newline at end of file diff --git a/python/web/src/return_code_mapper.py b/python/web/src/return_code_mapper.py index 94d9a5f3..f5572ca5 100644 --- a/python/web/src/return_code_mapper.py +++ b/python/web/src/return_code_mapper.py @@ -1,4 +1,4 @@ -"""Module for mapping between rascsi return codes and translated strings""" +"""Module for mapping between return codes and translated strings""" from rascsi.return_codes import ReturnCodes from flask_babel import _, lazy_gettext @@ -6,8 +6,9 @@ from flask_babel import _, lazy_gettext # pylint: disable=too-few-public-methods class ReturnCodeMapper: - """Class for mapping between rascsi return codes and translated strings""" + """Class for mapping between return codes and translated strings""" + # fmt: off MESSAGES = { ReturnCodes.DELETEFILE_SUCCESS: _("File deleted: %(file_path)s"), @@ -46,11 +47,12 @@ class ReturnCodeMapper: ReturnCodes.EXTRACTIMAGE_COMMAND_ERROR: _("Unable to extract archive: %(error)s"), } + # fmt: on @staticmethod def add_msg(payload): - """adds a msg key to a given payload with a rascsi module return code - with a translated return code message string. """ + """adds a msg key to a given payload with a module return code + with a translated return code message string.""" if "return_code" not in payload: return payload @@ -60,7 +62,7 @@ class ReturnCodeMapper: payload["msg"] = lazy_gettext( ReturnCodeMapper.MESSAGES[payload["return_code"]], **parameters, - ) + ) else: payload["msg"] = lazy_gettext(ReturnCodeMapper.MESSAGES[payload["return_code"]]) diff --git a/python/web/src/settings.py b/python/web/src/settings.py index 538c7bfe..e0d033e4 100644 --- a/python/web/src/settings.py +++ b/python/web/src/settings.py @@ -30,4 +30,4 @@ TEMPLATE_THEMES = ["classic", "modern"] TEMPLATE_THEME_DEFAULT = "modern" # Fallback theme for older browsers -TEMPLATE_THEME_LEGACY = "classic" \ No newline at end of file +TEMPLATE_THEME_LEGACY = "classic" diff --git a/python/web/src/socket_cmds_flask.py b/python/web/src/socket_cmds_flask.py index d379adfb..4e7e098d 100644 --- a/python/web/src/socket_cmds_flask.py +++ b/python/web/src/socket_cmds_flask.py @@ -5,9 +5,11 @@ Module for sending and receiving data over a socket connection with the RaSCSI b from flask import abort from flask_babel import _ -from rascsi.exceptions import (EmptySocketChunkException, - InvalidProtobufResponse, - FailedSocketConnectionException) +from rascsi.exceptions import ( + EmptySocketChunkException, + InvalidProtobufResponse, + FailedSocketConnectionException, +) from rascsi.socket_cmds import SocketCmds @@ -15,6 +17,7 @@ class SocketCmdsFlask(SocketCmds): """ Class for sending and receiving data over a socket connection with the RaSCSI backend """ + # pylint: disable=useless-super-delegation def __init__(self, host="localhost", port=6868): super().__init__(host, port) @@ -28,11 +31,16 @@ class SocketCmdsFlask(SocketCmds): return super().send_pb_command(payload) except FailedSocketConnectionException as err: # After failing all attempts, throw a 404 error - abort(404, _( - "The RaSCSI Web Interface failed to connect to RaSCSI at %(host)s:%(port)s " - "with error: %(error_msg)s. The RaSCSI process is not running or may have crashed.", - host=self.host, port=self.port, error_msg=str(err), - ) + abort( + 404, + _( + "The RaSCSI Web Interface failed to connect to RaSCSI at " + "%(host)s:%(port)s with error: %(error_msg)s. The RaSCSI " + "process is not running or may have crashed.", + host=self.host, + port=self.port, + error_msg=str(err), + ), ) return None @@ -42,19 +50,21 @@ class SocketCmdsFlask(SocketCmds): return super().send_over_socket(sock, payload) except EmptySocketChunkException: abort( - 503, _( + 503, + _( "The RaSCSI Web Interface lost connection to RaSCSI. " "Please go back and try again. " "If the issue persists, please report a bug." - ) + ), ) return None except InvalidProtobufResponse: abort( - 500, _( + 500, + _( "The RaSCSI Web Interface did not get a valid response from RaSCSI. " "Please go back and try again. " "If the issue persists, please report a bug." - ) + ), ) return None diff --git a/python/web/src/web.py b/python/web/src/web.py index 77266b56..03e109df 100644 --- a/python/web/src/web.py +++ b/python/web/src/web.py @@ -11,7 +11,6 @@ from grp import getgrall import bjoern from rascsi.return_codes import ReturnCodes -from werkzeug.utils import secure_filename from simplepam import authenticate from flask_babel import Babel, Locale, refresh, _ @@ -75,6 +74,7 @@ from settings import ( APP = Flask(__name__) BABEL = Babel(APP) + def get_env_info(): """ Get information about the app/host environment @@ -113,7 +113,7 @@ def response( redirect_url=None, error=False, status_code=200, - **kwargs + **kwargs, ): """ Generates a HTML or JSON HTTP response @@ -128,11 +128,16 @@ def response( messages = [(str(message), status)] if request.headers.get("accept") == "application/json": - return jsonify({ - "status": status, - "messages": [{"message": m, "category": c} for m, c in messages], - "data": kwargs - }), status_code + return ( + jsonify( + { + "status": status, + "messages": [{"message": m, "category": c} for m, c in messages], + "data": kwargs, + } + ), + status_code, + ) if messages: for message, category in messages: @@ -182,7 +187,7 @@ def get_supported_locales(): locales = [ {"language": x.language, "display_name": x.display_name} for x in [*BABEL.list_translations(), Locale("en")] - ] + ] return sorted(locales, key=lambda x: x["language"]) @@ -199,8 +204,8 @@ def index(): _( "RaSCSI is password protected. " "Start the Web Interface with the --password parameter." - ), - ) + ), + ) server_info = ractl_cmd.get_server_info() devices = ractl_cmd.list_devices() @@ -233,18 +238,15 @@ def index(): # This might break if something like 'hdt' etc. gets added in the future. sorted( [suffix for suffix in server_info["schd"] if suffix not in {"hdi", "nhd"}], - reverse=True - ) + - server_info["scrm"] + - server_info["scmo"] + reverse=True, ) + + server_info["scrm"] + + server_info["scmo"] + ) valid_image_suffixes = ( - server_info["schd"] + - server_info["scrm"] + - server_info["scmo"] + - server_info["sccd"] - ) + server_info["schd"] + server_info["scrm"] + server_info["scmo"] + server_info["sccd"] + ) return response( template="index.html", @@ -296,7 +298,7 @@ def drive_list(): template="drives.html", files=file_cmd.list_images()["files"], drive_properties=format_drive_properties(APP.config["RASCSI_DRIVE_PROPERTIES"]), - ) + ) @APP.route("/login", methods=["POST"]) @@ -312,10 +314,14 @@ def login(): session["username"] = request.form["username"] return response(env=get_env_info()) - return response(error=True, status_code=401, message=_( - "You must log in with valid credentials for a user in the '%(group)s' group", - group=AUTH_GROUP, - )) + return response( + error=True, + status_code=401, + message=_( + "You must log in with valid credentials for a user in the '%(group)s' group", + group=AUTH_GROUP, + ), + ) @APP.route("/logout") @@ -339,12 +345,14 @@ def login_required(func): """ Wrapper method for enabling authentication for an endpoint """ + @wraps(func) def decorated_function(*args, **kwargs): auth = auth_active(AUTH_GROUP) if auth["status"] and "username" not in session: return response(error=True, message=auth["msg"]) return func(*args, **kwargs) + return decorated_function @@ -357,10 +365,7 @@ def drive_create(): drive_name = request.form.get("drive_name") file_name = Path(request.form.get("file_name")).name - properties = get_properties_by_drive_name( - APP.config["RASCSI_DRIVE_PROPERTIES"], - drive_name - ) + properties = get_properties_by_drive_name(APP.config["RASCSI_DRIVE_PROPERTIES"], drive_name) if not properties: return response( @@ -373,7 +378,7 @@ def drive_create(): file_name, properties["file_type"], properties["size"], - ) + ) if not process["status"]: return response(error=True, message=process["msg"]) @@ -385,8 +390,11 @@ def drive_create(): if not process["status"]: return response(error=True, message=process["msg"]) - return response(message= - _("Image file with properties created: %(file_name)s", file_name=full_file_name) + return response( + message=_( + "Image file with properties created: %(file_name)s", + file_name=full_file_name, + ) ) @@ -401,10 +409,7 @@ def drive_cdrom(): # Creating the drive properties file file_name = f"{file_name}.{PROPERTIES_SUFFIX}" - properties = get_properties_by_drive_name( - APP.config["RASCSI_DRIVE_PROPERTIES"], - drive_name - ) + properties = get_properties_by_drive_name(APP.config["RASCSI_DRIVE_PROPERTIES"], drive_name) if not properties: return response( @@ -474,19 +479,17 @@ def show_diskinfo(): if not safe_path["status"]: return response(error=True, message=safe_path["msg"]) server_info = ractl_cmd.get_server_info() - returncode, diskinfo = sys_cmd.get_diskinfo( - Path(server_info["image_dir"]) / file_name - ) + returncode, diskinfo = sys_cmd.get_diskinfo(Path(server_info["image_dir"]) / file_name) if returncode == 0: return response( template="diskinfo.html", file_name=str(file_name), diskinfo=diskinfo, - ) + ) return response( error=True, - message=_("An error occurred when getting disk info: %(error)s", error=diskinfo) + message=_("An error occurred when getting disk info: %(error)s", error=diskinfo), ) @@ -497,23 +500,20 @@ def show_manpage(): """ app_allowlist = ["rascsi", "rasctl", "rasdump", "scsimon"] - app = request.args.get("app", type = str) + app = request.args.get("app", type=str) if app not in app_allowlist: - return response( - error=True, - message=_("%(app)s is not a recognized RaSCSI app", app=app) - ) + return response(error=True, message=_("%(app)s is not a recognized RaSCSI app", app=app)) file_path = f"{WEB_DIR}/../../../doc/{app}.1" html_to_strip = [ - "Content-type", - "!DOCTYPE", - "", - "", - "", - "

", - ] + "Content-type", + "!DOCTYPE", + "", + "", + "", + "

", + ] returncode, manpage = sys_cmd.get_manpage(file_path) if returncode == 0: @@ -524,7 +524,7 @@ def show_manpage(): line = line.replace("/?1+", "manpage?app=") # Strip out useless hyperlink elif "man2html" in line: - line = line.replace("man2html", "man2html") + line = line.replace('man2html', "man2html") if not any(ele in line for ele in html_to_strip): formatted_manpage += line @@ -532,11 +532,11 @@ def show_manpage(): template="manpage.html", app=app, manpage=formatted_manpage, - ) + ) return response( error=True, - message=_("An error occurred when accessing man page: %(error)s", error=manpage) + message=_("An error occurred when accessing man page: %(error)s", error=manpage), ) @@ -555,11 +555,11 @@ def show_logs(): scope=scope, lines=lines, logs=logs, - ) + ) return response( error=True, - message=_("An error occurred when fetching logs: %(error)s", error=logs) + message=_("An error occurred when fetching logs: %(error)s", error=logs), ) @@ -615,10 +615,10 @@ def attach_device(): return response(error=True, message=bridge_status["msg"]) kwargs = { - "unit": int(unit), - "device_type": device_type, - "params": params, - } + "unit": int(unit), + "device_type": device_type, + "params": params, + } if drive_props: kwargs["vendor"] = drive_props["vendor"] kwargs["product"] = drive_props["product"] @@ -628,12 +628,14 @@ def attach_device(): process = ractl_cmd.attach_device(scsi_id, **kwargs) process = ReturnCodeMapper.add_msg(process) if process["status"]: - return response(message=_( - "Attached %(device_type)s to SCSI ID %(id_number)s LUN %(unit_number)s", - device_type=get_device_name(device_type), - id_number=scsi_id, - unit_number=unit, - )) + return response( + message=_( + "Attached %(device_type)s to SCSI ID %(id_number)s LUN %(unit_number)s", + device_type=get_device_name(device_type), + id_number=scsi_id, + unit_number=unit, + ) + ) return response(error=True, message=process["msg"]) @@ -687,7 +689,7 @@ def attach_image(): "The image may be corrupted, so proceed with caution.", file_size, expected_block_size, - ) + ) return response( message=_( "Attached %(file_name)s as %(device_type)s to " @@ -696,8 +698,8 @@ def attach_image(): device_type=get_device_name(device_type), id_number=scsi_id, unit_number=unit, - ) ) + ) return response(error=True, message=process["msg"]) @@ -725,8 +727,13 @@ def detach(): unit = request.form.get("unit") process = ractl_cmd.detach_by_id(scsi_id, unit) if process["status"]: - return response(message=_("Detached SCSI ID %(id_number)s LUN %(unit_number)s", - id_number=scsi_id, unit_number=unit)) + return response( + message=_( + "Detached SCSI ID %(id_number)s LUN %(unit_number)s", + id_number=scsi_id, + unit_number=unit, + ) + ) return response(error=True, message=process["msg"]) @@ -742,8 +749,13 @@ def eject(): process = ractl_cmd.eject_by_id(scsi_id, unit) if process["status"]: - return response(message=_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s", - id_number=scsi_id, unit_number=unit)) + return response( + message=_( + "Ejected SCSI ID %(id_number)s LUN %(unit_number)s", + id_number=scsi_id, + unit_number=unit, + ) + ) return response(error=True, message=process["msg"]) @@ -758,7 +770,7 @@ def device_info(): return response( template="deviceinfo.html", devices=process["device_list"], - ) + ) return response(error=True, message=_("No devices attached")) @@ -793,7 +805,9 @@ def release_id(): process = ractl_cmd.reserve_scsi_ids(reserved_ids) if process["status"]: RESERVATIONS[int(scsi_id)] = "" - return response(message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id)) + return response( + message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id) + ) return response(error=True, message=process["msg"]) @@ -868,7 +882,7 @@ def download_to_iso(): else: return response( error=True, - message=_("%(iso_type)s is not a valid CD-ROM format.", iso_type=iso_type) + message=_("%(iso_type)s is not a valid CD-ROM format.", iso_type=iso_type), ) process = file_cmd.download_file_to_iso(url, *iso_args) @@ -883,10 +897,10 @@ def download_to_iso(): ) process_attach = ractl_cmd.attach_device( - scsi_id, - device_type="SCCD", - params={"file": process["file_name"]}, - ) + scsi_id, + device_type="SCCD", + params={"file": process["file_name"]}, + ) process_attach = ReturnCodeMapper.add_msg(process_attach) if process_attach["status"]: return response( @@ -965,7 +979,7 @@ def create_file(): Creates an empty image file in the images dir """ file_name = Path(request.form.get("file_name")) - size = (int(request.form.get("size")) * 1024 * 1024) + size = int(request.form.get("size")) * 1024 * 1024 file_type = request.form.get("type") drive_name = request.form.get("drive_name") drive_format = request.form.get("drive_format") @@ -984,11 +998,11 @@ def create_file(): if drive_format: volume_name = f"HD {size / 1024 / 1024:0.0f}M" known_formats = [ - "Lido 7.56", - "SpeedTools 3.6", - "FAT16", - "FAT32", - ] + "Lido 7.56", + "SpeedTools 3.6", + "FAT16", + "FAT32", + ] message_postfix = f" ({drive_format})" if drive_format not in known_formats: @@ -997,7 +1011,7 @@ def create_file(): message=_( "%(drive_format)s is not a valid hard disk format.", drive_format=drive_format, - ) + ), ) elif drive_format.startswith("FAT"): if drive_format == "FAT16": @@ -1010,7 +1024,7 @@ def create_file(): message=_( "%(drive_format)s is not a valid hard disk format.", drive_format=drive_format, - ) + ), ) process = file_cmd.partition_disk(full_file_name, volume_name, "FAT") @@ -1018,11 +1032,11 @@ def create_file(): return response(error=True, message=process["msg"]) process = file_cmd.format_fat( - full_file_name, - # FAT volume labels are max 11 chars - volume_name[:11], - fat_size, - ) + full_file_name, + # FAT volume labels are max 11 chars + volume_name[:11], + fat_size, + ) if not process["status"]: return response(error=True, message=process["msg"]) @@ -1033,19 +1047,16 @@ def create_file(): return response(error=True, message=process["msg"]) process = file_cmd.format_hfs( - full_file_name, - volume_name, - driver_base_path / Path(drive_format.replace(" ", "-") + ".img"), - ) + full_file_name, + volume_name, + driver_base_path / Path(drive_format.replace(" ", "-") + ".img"), + ) if not process["status"]: return response(error=True, message=process["msg"]) # Creating the drive properties file, if one is chosen if drive_name: - properties = get_properties_by_drive_name( - APP.config["RASCSI_DRIVE_PROPERTIES"], - drive_name - ) + properties = get_properties_by_drive_name(APP.config["RASCSI_DRIVE_PROPERTIES"], drive_name) if properties: prop_file_name = f"{full_file_name}.{PROPERTIES_SUFFIX}" process = file_cmd.write_drive_properties(prop_file_name, properties) @@ -1221,26 +1232,23 @@ def extract_image(): safe_path = is_safe_path(archive_file) if not safe_path["status"]: return response(error=True, message=safe_path["msg"]) - extract_result = file_cmd.extract_image( - str(archive_file), - archive_members - ) + extract_result = file_cmd.extract_image(str(archive_file), archive_members) if extract_result["return_code"] == ReturnCodes.EXTRACTIMAGE_SUCCESS: for properties_file in extract_result["properties_files_moved"]: if properties_file["status"]: logging.info( - "Properties file %s moved to %s", - properties_file["name"], - CFG_DIR, - ) + "Properties file %s moved to %s", + properties_file["name"], + CFG_DIR, + ) else: logging.warning( - "Failed to move properties file %s to %s", - properties_file["name"], - CFG_DIR, - ) + "Failed to move properties file %s to %s", + properties_file["name"], + CFG_DIR, + ) return response(message=ReturnCodeMapper.add_msg(extract_result).get("msg")) @@ -1300,28 +1308,28 @@ if __name__ == "__main__": default=8080, action="store", help="Port number the web server will run on", - ) + ) parser.add_argument( "--password", type=str, default="", action="store", help="Token password string for authenticating with RaSCSI", - ) + ) parser.add_argument( "--rascsi-host", type=str, default="localhost", action="store", help="RaSCSI host. Default: localhost", - ) + ) parser.add_argument( "--rascsi-port", type=int, default=6868, action="store", help="RaSCSI port. Default: 6868", - ) + ) parser.add_argument( "--log-level", type=str, @@ -1329,12 +1337,12 @@ if __name__ == "__main__": action="store", help="Log level for Web UI. Default: warning", choices=["debug", "info", "warning", "error", "critical"], - ) + ) parser.add_argument( "--dev-mode", action="store_true", - help="Run in development mode" - ) + help="Run in development mode", + ) arguments = parser.parse_args() APP.config["RASCSI_TOKEN"] = arguments.password @@ -1357,14 +1365,17 @@ if __name__ == "__main__": APP.config["RASCSI_DRIVE_PROPERTIES"] = [] logging.warning("Could not read drive properties from %s", DRIVE_PROPERTIES_FILE) - logging.basicConfig(stream=sys.stdout, - format="%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s", - level=arguments.log_level.upper()) + logging.basicConfig( + stream=sys.stdout, + format="%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s", + level=arguments.log_level.upper(), + ) if arguments.dev_mode: print("Running rascsi-web in development mode ...") APP.debug = True from werkzeug.debug import DebuggedApplication + try: bjoern.run(DebuggedApplication(APP, evalex=False), "0.0.0.0", arguments.port) except KeyboardInterrupt: diff --git a/python/web/src/web_utils.py b/python/web/src/web_utils.py index a1db8b3a..d0b48c05 100644 --- a/python/web/src/web_utils.py +++ b/python/web/src/web_utils.py @@ -14,6 +14,7 @@ from werkzeug.utils import secure_filename from rascsi.sys_cmds import SysCmds + def get_valid_scsi_ids(devices, reserved_ids): """ Takes a list of (dict)s devices, and list of (int)s reserved_ids. @@ -43,7 +44,7 @@ def get_valid_scsi_ids(devices, reserved_ids): "valid_ids": valid_ids, "occupied_ids": occupied_ids, "recommended_id": recommended_id, - } + } def sort_and_format_devices(devices): @@ -180,7 +181,8 @@ def format_drive_properties(drive_properties): "cd_conf": cd_conf, "rm_conf": rm_conf, "mo_conf": mo_conf, - } + } + def get_properties_by_drive_name(drives, drive_name): """ @@ -198,11 +200,12 @@ def get_properties_by_drive_name(drives, drive_name): "revision": drive["revision"], "block_size": drive["block_size"], "size": drive["size"], - } + } logging.error("Properties for drive '%s' does not exist in database", drive_name) return False + def auth_active(group): """ Inspects if the group defined in (str) group exists on the system. @@ -212,9 +215,9 @@ def auth_active(group): groups = [g.gr_name for g in getgrall()] if group in groups: return { - "status": True, - "msg": _("You must log in to use this function"), - } + "status": True, + "msg": _("You must log in to use this function"), + } return {"status": False, "msg": ""} @@ -272,7 +275,7 @@ def upload_with_dropzonejs(image_dir): file_name = secure_filename(file_object.filename) save_path = path.join(image_dir, file_name) - current_chunk = int(request.form['dzchunkindex']) + current_chunk = int(request.form["dzchunkindex"]) # Makes sure not to overwrite an existing file, # but continues writing to a file transfer in progress @@ -307,21 +310,21 @@ def browser_supports_modern_themes(): return False user_agent = user_agent_parser.Parse(user_agent_string) - if not user_agent['user_agent']['family']: + if not user_agent["user_agent"]["family"]: return False # (family, minimum version) supported_browsers = [ - ('Safari', 14), - ('Chrome', 100), - ('Firefox', 100), - ('Edge', 100), - ('Mobile Safari', 14), - ('Chrome Mobile', 100), + ("Safari", 14), + ("Chrome", 100), + ("Firefox", 100), + ("Edge", 100), + ("Mobile Safari", 14), + ("Chrome Mobile", 100), ] - current_ua_family = user_agent['user_agent']['family'] - current_ua_version = float(user_agent['user_agent']['major']) + current_ua_family = user_agent["user_agent"]["family"] + current_ua_version = float(user_agent["user_agent"]["major"]) logging.info(f"Identified browser as family={current_ua_family}, version={current_ua_version}") for supported_browser, supported_version in supported_browsers: