diff --git a/python/.pylintrc b/python/.pylintrc index b1a8aa6b..99c84cb5 100644 --- a/python/.pylintrc +++ b/python/.pylintrc @@ -140,8 +140,7 @@ disable=print-statement, xreadlines-attribute, deprecated-sys-function, exception-escape, - comprehension-escape, - import-outside-toplevel + comprehension-escape # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/python/common/src/rascsi/file_cmds.py b/python/common/src/rascsi/file_cmds.py index b3e493ec..bfc71af6 100644 --- a/python/common/src/rascsi/file_cmds.py +++ b/python/common/src/rascsi/file_cmds.py @@ -4,8 +4,16 @@ Module for methods reading from and writing to the file system import os import logging -from pathlib import PurePath import asyncio +from pathlib import PurePath +from zipfile import ZipFile, is_zipfile +from re import escape, findall +from time import time +from subprocess import run, CalledProcessError +from json import dump, load + +import requests + import rascsi_interface_pb2 as proto from rascsi.common_settings import CFG_DIR, CONFIG_FILE_SUFFIX, PROPERTIES_SUFFIX, RESERVATIONS from rascsi.ractl_cmds import RaCtlCmds @@ -79,7 +87,6 @@ class FileCmds: prop_data = self.list_files(PROPERTIES_SUFFIX, CFG_DIR) prop_files = [PurePath(x[0]).stem for x in prop_data] - from zipfile import ZipFile, is_zipfile server_info = self.ractl.get_server_info() files = [] for file in result.image_files_info.image_files: @@ -225,12 +232,11 @@ class FileCmds: members contains all of the full paths to each of the zip archive members Returns (dict) with (boolean) status and (list of str) msg """ - from asyncio import run server_info = self.ractl.get_server_info() prop_flag = False if not member: - unzip_proc = run(self.run_async( + unzip_proc = asyncio.run(self.run_async( f"unzip -d {server_info['image_dir']} -n -j " f"{server_info['image_dir']}/{file_name}" )) @@ -241,14 +247,13 @@ class FileCmds: self.rename_file(f"{server_info['image_dir']}/{name}", f"{CFG_DIR}/{name}") prop_flag = True else: - from re import escape member = escape(member) - unzip_proc = run(self.run_async( + unzip_proc = asyncio.run(self.run_async( f"unzip -d {server_info['image_dir']} -n -j " f"{server_info['image_dir']}/{file_name} {member}" )) # Attempt to unzip a properties file in the same archive dir - unzip_prop = run(self.run_async( + unzip_prop = asyncio.run(self.run_async( f"unzip -d {CFG_DIR} -n -j " f"{server_info['image_dir']}/{file_name} {member}.{PROPERTIES_SUFFIX}" )) @@ -258,7 +263,6 @@ class FileCmds: logging.warning("Unzipping failed: %s", unzip_proc["stderr"]) return {"status": False, "msg": unzip_proc["stderr"]} - from re import findall unzipped = findall( "(?:inflating|extracting):(.+)\n", unzip_proc["stdout"] @@ -270,8 +274,6 @@ class FileCmds: Takes (str) url and one or more (str) *iso_args Returns (dict) with (bool) status and (str) msg """ - from time import time - from subprocess import run, CalledProcessError server_info = self.ractl.get_server_info() @@ -287,7 +289,6 @@ class FileCmds: if not req_proc["status"]: return {"status": False, "msg": req_proc["msg"]} - from zipfile import is_zipfile, ZipFile if is_zipfile(tmp_full_path): if "XtraStuf.mac" in str(ZipFile(tmp_full_path).namelist()): logging.info("MacZip file format detected. Will not unzip to retain resource fork.") @@ -339,7 +340,6 @@ class FileCmds: Takes (str) url, (str) save_dir, (str) file_name Returns (dict) with (bool) status and (str) msg """ - import requests logging.info("Making a request to download %s", url) try: @@ -371,7 +371,6 @@ class FileCmds: Takes (str) file_name Returns (dict) with (bool) status and (str) msg """ - from json import dump file_name = f"{CFG_DIR}/{file_name}" try: with open(file_name, "w", encoding="ISO-8859-1") as json_file: @@ -433,7 +432,6 @@ class FileCmds: Takes (str) file_name Returns (dict) with (bool) status and (str) msg """ - from json import load file_name = f"{CFG_DIR}/{file_name}" try: with open(file_name, encoding="ISO-8859-1") as json_file: @@ -512,7 +510,6 @@ class FileCmds: Takes file name base (str) and (list of dicts) conf as arguments Returns (dict) with (bool) status and (str) msg """ - from json import dump file_path = f"{CFG_DIR}/{file_name}" try: with open(file_path, "w") as json_file: @@ -548,7 +545,6 @@ class FileCmds: Takes (str) file_path as argument. Returns (dict) with (bool) status, (str) msg, (dict) conf """ - from json import load try: with open(file_path) as json_file: conf = load(json_file) diff --git a/python/common/src/rascsi/ractl_cmds.py b/python/common/src/rascsi/ractl_cmds.py index 3e0576fe..cd0dc2bb 100644 --- a/python/common/src/rascsi/ractl_cmds.py +++ b/python/common/src/rascsi/ractl_cmds.py @@ -162,7 +162,7 @@ class RaCtlCmds: def get_disk_device_types(self): """ - Returns a (list) of (str) of four letter device acronyms + Returns a (list) of (str) of four letter device acronyms that take image files as arguments. """ device_types = self.get_device_types() diff --git a/python/common/src/rascsi/socket_cmds.py b/python/common/src/rascsi/socket_cmds.py index ea3b867f..5b5a7f04 100644 --- a/python/common/src/rascsi/socket_cmds.py +++ b/python/common/src/rascsi/socket_cmds.py @@ -3,7 +3,10 @@ Module for sending and receiving data over a socket connection with the RaSCSI b """ import logging +import socket from time import sleep +from struct import pack, unpack + from rascsi.exceptions import (EmptySocketChunkException, InvalidProtobufResponse, FailedSocketConnectionException) @@ -27,7 +30,6 @@ class SocketCmds: tries = 20 error_msg = "" - import socket while counter < tries: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: @@ -56,7 +58,6 @@ class SocketCmds: Tries to extract and interpret the protobuf header to get response size. Reads data from socket in 2048 bytes chunks until all data is received. """ - from struct import pack, unpack # Sending the magic word "RASCSI" to authenticate with the server sock.send(b"RASCSI") diff --git a/python/common/src/rascsi/sys_cmds.py b/python/common/src/rascsi/sys_cmds.py new file mode 100644 index 00000000..7fd79e8d --- /dev/null +++ b/python/common/src/rascsi/sys_cmds.py @@ -0,0 +1,170 @@ +""" +Module with methods that interact with the Pi system +""" +import subprocess +import logging +from subprocess import run +from shutil import disk_usage +from re import findall, match +from socket import socket, gethostname, AF_INET, SOCK_DGRAM + + +class SysCmds: + """ + Class for commands sent to the Pi's Linux system. + """ + + @staticmethod + def running_env(): + """ + Returns (str) git and (str) env + git contains the git hash of the checked out code + env is the various system information where this app is running + """ + try: + ra_git_version = ( + subprocess.run( + ["git", "rev-parse", "HEAD"], + capture_output=True, + check=True, + ) + .stdout.decode("utf-8") + .strip() + ) + except subprocess.CalledProcessError as error: + logging.warning("Executed shell command: %s", " ".join(error.cmd)) + logging.warning("Got error: %s", error.stderr.decode("utf-8")) + ra_git_version = "" + + try: + pi_version = ( + subprocess.run( + ["uname", "-a"], + capture_output=True, + check=True, + ) + .stdout.decode("utf-8") + .strip() + ) + except subprocess.CalledProcessError as error: + logging.warning("Executed shell command: %s", " ".join(error.cmd)) + logging.warning("Got error: %s", error.stderr.decode("utf-8")) + pi_version = "Unknown" + + return {"git": ra_git_version, "env": pi_version} + + @staticmethod + def running_proc(daemon): + """ + Takes (str) daemon + Returns (int) proc, which is the number of processes currently running + """ + try: + processes = ( + subprocess.run( + ["ps", "aux"], + capture_output=True, + check=True, + ) + .stdout.decode("utf-8") + .strip() + ) + except subprocess.CalledProcessError as error: + logging.warning("Executed shell command: %s", " ".join(error.cmd)) + logging.warning("Got error: %s", error.stderr.decode("utf-8")) + processes = "" + + matching_processes = findall(daemon, processes) + return len(matching_processes) + + @staticmethod + def is_bridge_setup(): + """ + Returns (bool) True if the rascsi_bridge network interface exists + """ + try: + bridges = ( + subprocess.run( + ["brctl", "show"], + capture_output=True, + check=True, + ) + .stdout.decode("utf-8") + .strip() + ) + except subprocess.CalledProcessError as error: + logging.warning("Executed shell command: %s", " ".join(error.cmd)) + logging.warning("Got error: %s", error.stderr.decode("utf-8")) + bridges = "" + + if "rascsi_bridge" in bridges: + return True + return False + + @staticmethod + def disk_space(): + """ + Returns a (dict) with (int) total (int) used (int) free + This is the disk space information of the volume where this app is running + """ + total, used, free = disk_usage(__file__) + return {"total": total, "used": used, "free": free} + + @staticmethod + def introspect_file(file_path, re_term): + """ + Takes a (str) file_path and (str) re_term in regex format + Will introspect file_path for the existance of re_term + and return True if found, False if not found + """ + try: + ifile = open(file_path, "r", encoding="ISO-8859-1") + except (IOError, ValueError, EOFError, TypeError) as error: + logging.error(str(error)) + return False + for line in ifile: + if match(re_term, line): + return True + return False + + # pylint: disable=broad-except + @staticmethod + def get_ip_and_host(): + """ + Use a mock socket connection to identify the Pi's hostname and IP address + """ + host = gethostname() + sock = socket(AF_INET, SOCK_DGRAM) + try: + # mock ip address; doesn't have to be reachable + sock.connect(('10.255.255.255', 1)) + ip_addr = sock.getsockname()[0] + except Exception: + ip_addr = False + finally: + sock.close() + return ip_addr, host + + @staticmethod + def get_logs(lines, scope): + """ + Takes (int) lines and (str) scope. + Returns either the decoded log output, or the stderr output of journalctl. + """ + if scope != "all": + process = run( + ["journalctl", "-n", lines, "-u", scope], + capture_output=True, + check=True, + ) + else: + process = run( + ["journalctl", "-n", lines], + capture_output=True, + check=True, + ) + + if process.returncode == 0: + return process.returncode, process.stdout.decode("utf-8") + + return process.returncode, process.stderr.decode("utf-8") diff --git a/python/oled/src/pi_cmds.py b/python/oled/src/pi_cmds.py deleted file mode 100644 index a33913db..00000000 --- a/python/oled/src/pi_cmds.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Module with methods that interact with the Pi's Linux system -""" - - -def get_ip_and_host(): - """ - Use a mock socket connection to identify the Pi's hostname and IP address - """ - from socket import socket, gethostname, AF_INET, SOCK_DGRAM - host = gethostname() - sock = socket(AF_INET, SOCK_DGRAM) - try: - # mock ip address; doesn't have to be reachable - sock.connect(('10.255.255.255', 1)) - ip_addr = sock.getsockname()[0] - except Exception: - ip_addr = False - finally: - sock.close() - return ip_addr, host diff --git a/python/oled/src/rascsi_oled_monitor.py b/python/oled/src/rascsi_oled_monitor.py index 80d0e86d..3201e768 100755 --- a/python/oled/src/rascsi_oled_monitor.py +++ b/python/oled/src/rascsi_oled_monitor.py @@ -39,9 +39,9 @@ from board import I2C from adafruit_ssd1306 import SSD1306_I2C from PIL import Image, ImageDraw, ImageFont from interrupt_handler import GracefulInterruptHandler -from pi_cmds import get_ip_and_host from rascsi.ractl_cmds import RaCtlCmds from rascsi.socket_cmds import SocketCmds +from rascsi.sys_cmds import SysCmds parser = argparse.ArgumentParser(description="RaSCSI OLED Monitor script") parser.add_argument( @@ -99,6 +99,7 @@ TOKEN = args.password sock_cmd = SocketCmds(host=args.rascsi_host, port=args.rascsi_port) ractl_cmd = RaCtlCmds(sock_cmd=sock_cmd, token=TOKEN) +sys_cmd = SysCmds() WIDTH = 128 BORDER = 5 @@ -159,7 +160,7 @@ LINE_SPACING = 8 # Some other nice fonts to try: http://www.dafont.com/bitmap.php FONT = ImageFont.truetype('resources/type_writer.ttf', FONT_SIZE) -IP_ADDR, HOSTNAME = get_ip_and_host() +IP_ADDR, HOSTNAME = sys_cmd.get_ip_and_host() REMOVABLE_DEVICE_TYPES = ractl_cmd.get_removable_device_types() PERIPHERAL_DEVICE_TYPES = ractl_cmd.get_peripheral_device_types() @@ -183,7 +184,7 @@ def formatted_output(): else: output.append(f"{line['id']} {line['device_type'][2:4]} {line['status']}") # Special handling of devices that don't use image files - elif line["device_type"] in (PERIPHERAL_DEVICE_TYPES): + elif line["device_type"] in PERIPHERAL_DEVICE_TYPES: if line["vendor"] == "RaSCSI": output.append(f"{line['id']} {line['device_type'][2:4]} {line['product']}") else: diff --git a/python/web/src/device_utils.py b/python/web/src/device_utils.py deleted file mode 100644 index 0d1d6cf8..00000000 --- a/python/web/src/device_utils.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Module for RaSCSI device management utility methods -""" - -from flask_babel import _ - - -def get_valid_scsi_ids(devices, reserved_ids): - """ - Takes a list of (dict)s devices, and list of (int)s reserved_ids. - Returns: - - (list) of (int)s valid_ids, which are the SCSI ids that are not reserved - - (int) recommended_id, which is the id that the Web UI should default to recommend - """ - occupied_ids = [] - for device in devices: - occupied_ids.append(device["id"]) - - unoccupied_ids = [i for i in list(range(8)) if i not in reserved_ids + occupied_ids] - unoccupied_ids.sort() - valid_ids = [i for i in list(range(8)) if i not in reserved_ids] - valid_ids.sort(reverse=True) - - if unoccupied_ids: - recommended_id = unoccupied_ids[-1] - else: - recommended_id = occupied_ids.pop(0) - - return valid_ids, recommended_id - - -def sort_and_format_devices(devices): - """ - Takes a (list) of (dict)s devices and returns a (list) of (dict)s. - Sorts by SCSI ID acending (0 to 7). - For SCSI IDs where no device is attached, inject a (dict) with placeholder text. - """ - occupied_ids = [] - for device in devices: - occupied_ids.append(device["id"]) - - formatted_devices = devices - - # Add padding devices and sort the list - for i in range(8): - if i not in occupied_ids: - formatted_devices.append({"id": i, "device_type": "-", \ - "status": "-", "file": "-", "product": "-"}) - # Sort list of devices by id - formatted_devices.sort(key=lambda dic: str(dic["id"])) - - return formatted_devices - - -def map_device_types_and_names(device_types): - """ - Takes a (dict) corresponding to the data structure returned by RaCtlCmds.get_device_types() - Returns a (dict) of device_type:device_name mappings of localized device names - """ - for key, value in device_types.items(): - device_types[key]["name"] = get_device_name(key) - - return device_types - - -def get_device_name(device_type): - """ - Takes a four letter device acronym (str) device_type. - Returns the human-readable name for the device type. - """ - if device_type == "SAHD": - return _("SASI Hard Disk") - elif device_type == "SCHD": - return _("SCSI Hard Disk") - elif device_type == "SCRM": - return _("Removable Disk") - elif device_type == "SCMO": - return _("Magneto-Optical") - elif device_type == "SCCD": - return _("CD / DVD") - elif device_type == "SCBR": - return _("X68000 Host Bridge") - elif device_type == "SCDP": - return _("DaynaPORT SCSI/Link") - elif device_type == "SCLP": - return _("Printer") - elif device_type == "SCHS": - return _("Host Services") - else: - return device_type diff --git a/python/web/src/pi_cmds.py b/python/web/src/pi_cmds.py deleted file mode 100644 index 55fe4d86..00000000 --- a/python/web/src/pi_cmds.py +++ /dev/null @@ -1,156 +0,0 @@ -""" -Module for methods controlling and getting information about the Pi's Linux system -""" - -import subprocess -import logging -from flask_babel import _ -from settings import AUTH_GROUP - - -def running_env(): - """ - Returns (str) git and (str) env - git contains the git hash of the checked out code - env is the various system information where this app is running - """ - try: - ra_git_version = ( - subprocess.run( - ["git", "rev-parse", "HEAD"], - capture_output=True, - check=True, - ) - .stdout.decode("utf-8") - .strip() - ) - except subprocess.CalledProcessError as error: - logging.warning("Executed shell command: %s", " ".join(error.cmd)) - logging.warning("Got error: %s", error.stderr.decode("utf-8")) - ra_git_version = "" - - try: - pi_version = ( - subprocess.run( - ["uname", "-a"], - capture_output=True, - check=True, - ) - .stdout.decode("utf-8") - .strip() - ) - except subprocess.CalledProcessError as error: - logging.warning("Executed shell command: %s", " ".join(error.cmd)) - logging.warning("Got error: %s", error.stderr.decode("utf-8")) - pi_version = "Unknown" - - return {"git": ra_git_version, "env": pi_version} - - -def running_proc(daemon): - """ - Takes (str) daemon - Returns (int) proc, which is the number of processes currently running - """ - try: - processes = ( - subprocess.run( - ["ps", "aux"], - capture_output=True, - check=True, - ) - .stdout.decode("utf-8") - .strip() - ) - except subprocess.CalledProcessError as error: - logging.warning("Executed shell command: %s", " ".join(error.cmd)) - logging.warning("Got error: %s", error.stderr.decode("utf-8")) - processes = "" - - from re import findall - matching_processes = findall(daemon, processes) - return len(matching_processes) - - -def is_bridge_setup(): - """ - Returns (bool) True if the rascsi_bridge network interface exists - """ - try: - bridges = ( - subprocess.run( - ["brctl", "show"], - capture_output=True, - check=True, - ) - .stdout.decode("utf-8") - .strip() - ) - except subprocess.CalledProcessError as error: - logging.warning("Executed shell command: %s", " ".join(error.cmd)) - logging.warning("Got error: %s", error.stderr.decode("utf-8")) - bridges = "" - - if "rascsi_bridge" in bridges: - return True - return False - - -def disk_space(): - """ - Returns a (dict) with (int) total (int) used (int) free - This is the disk space information of the volume where this app is running - """ - from shutil import disk_usage - total, used, free = disk_usage(__file__) - return {"total": total, "used": used, "free": free} - - -def get_ip_address(): - """ - Use a mock socket connection to identify the Pi's IP address - """ - from socket import socket, AF_INET, SOCK_DGRAM - sock = socket(AF_INET, SOCK_DGRAM) - try: - # mock ip address; doesn't have to be reachable - sock.connect(('10.255.255.255', 1)) - ip_addr = sock.getsockname()[0] - except Exception: - ip_addr = '127.0.0.1' - finally: - sock.close() - return ip_addr - - -def introspect_file(file_path, re_term): - """ - Takes a (str) file_path and (str) re_term in regex format - Will introspect file_path for the existance of re_term - and return True if found, False if not found - """ - from re import match - try: - ifile = open(file_path, "r", encoding="ISO-8859-1") - except: - return False - for line in ifile: - if match(re_term, line): - return True - return False - - -def auth_active(): - """ - Inspects if the group defined in AUTH_GROUP exists on the system. - If it exists, tell the webapp to enable authentication. - Returns a (dict) with (bool) status and (str) msg - """ - from grp import getgrall - groups = [g.gr_name for g in getgrall()] - if AUTH_GROUP in groups: - return { - "status": True, - "msg": _("You must log in to use this function"), - } - return {"status": False, "msg": ""} diff --git a/python/web/src/return_code_mapper.py b/python/web/src/return_code_mapper.py index a25a3f11..d42fad93 100644 --- a/python/web/src/return_code_mapper.py +++ b/python/web/src/return_code_mapper.py @@ -41,5 +41,8 @@ class ReturnCodeMapper: parameters = payload["parameters"] - payload["msg"] = lazy_gettext(ReturnCodeMapper.MESSAGES[payload["return_code"]], **parameters) + payload["msg"] = lazy_gettext( + ReturnCodeMapper.MESSAGES[payload["return_code"]], + **parameters, + ) return payload diff --git a/python/web/src/templates/base.html b/python/web/src/templates/base.html index da316687..9871e8d8 100644 --- a/python/web/src/templates/base.html +++ b/python/web/src/templates/base.html @@ -1,7 +1,7 @@
-+ |
- RaSCSI - 68kmla Edition+{{ _("RaSCSI Control Page") }} |
+ hostname: {{ host }} ip: {{ ip_addr }} + | +