mirror of
https://github.com/akuker/RASCSI.git
synced 2025-07-13 02:23:54 +00:00
Create SysCmds common class, and refactor Python codebase (#697)
* Move the oled script's PiCmds module to common, and rename it SysCmds. * Use sys_cmds.get_ip_and_host() in web UI code. * Move the auth_active() method to device_utils * Rename device_utils to web_utils. Make auth_active() method take the group as argument. * Migrate all pi_cmds methods to the SysCmds common class. * Display hostname and ip in Web UI. * Resolve or suppress pylint warnings. * Resolve a pylint warning. * Resolve or suppress pylint warnings. * Import libraries at the top level for readability. In my testing on a Pi3B+, this leads to ~1.5k more memory being used by the python3 process. * Change page title as requested by akuker. * Reenable the import-outside-toplevel pylint rule. * Resolve pylint warnings. * Fix error following refactoring. * Minor UI tweaks. * Cleanup. * Break out bridge config validation into a utility method. * Move the dropzonejs method into the web_utils package * Move get_logs method into SysCmds class. * Improve get logs UI. * Resolve pylint warning. * Standardize class instance name.
This commit is contained in:
183
python/web/src/web_utils.py
Normal file
183
python/web/src/web_utils.py
Normal file
@ -0,0 +1,183 @@
|
||||
"""
|
||||
Module for RaSCSI Web Interface utility methods
|
||||
"""
|
||||
|
||||
import logging
|
||||
from grp import getgrall
|
||||
from os import path
|
||||
from pathlib import Path
|
||||
|
||||
from flask import request, make_response
|
||||
from flask_babel import _
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from rascsi.sys_cmds import SysCmds
|
||||
|
||||
def get_valid_scsi_ids(devices, reserved_ids):
|
||||
"""
|
||||
Takes a list of (dict)s devices, and list of (int)s reserved_ids.
|
||||
Returns:
|
||||
- (list) of (int)s valid_ids, which are the SCSI ids that are not reserved
|
||||
- (int) recommended_id, which is the id that the Web UI should default to recommend
|
||||
"""
|
||||
occupied_ids = []
|
||||
for device in devices:
|
||||
occupied_ids.append(device["id"])
|
||||
|
||||
unoccupied_ids = [i for i in list(range(8)) if i not in reserved_ids + occupied_ids]
|
||||
unoccupied_ids.sort()
|
||||
valid_ids = [i for i in list(range(8)) if i not in reserved_ids]
|
||||
valid_ids.sort(reverse=True)
|
||||
|
||||
if unoccupied_ids:
|
||||
recommended_id = unoccupied_ids[-1]
|
||||
else:
|
||||
recommended_id = occupied_ids.pop(0)
|
||||
|
||||
return valid_ids, recommended_id
|
||||
|
||||
|
||||
def sort_and_format_devices(devices):
|
||||
"""
|
||||
Takes a (list) of (dict)s devices and returns a (list) of (dict)s.
|
||||
Sorts by SCSI ID acending (0 to 7).
|
||||
For SCSI IDs where no device is attached, inject a (dict) with placeholder text.
|
||||
"""
|
||||
occupied_ids = []
|
||||
for device in devices:
|
||||
occupied_ids.append(device["id"])
|
||||
|
||||
formatted_devices = devices
|
||||
|
||||
# Add padding devices and sort the list
|
||||
for i in range(8):
|
||||
if i not in occupied_ids:
|
||||
formatted_devices.append({"id": i, "device_type": "-", \
|
||||
"status": "-", "file": "-", "product": "-"})
|
||||
# Sort list of devices by id
|
||||
formatted_devices.sort(key=lambda dic: str(dic["id"]))
|
||||
|
||||
return formatted_devices
|
||||
|
||||
|
||||
def map_device_types_and_names(device_types):
|
||||
"""
|
||||
Takes a (dict) corresponding to the data structure returned by RaCtlCmds.get_device_types()
|
||||
Returns a (dict) of device_type:device_name mappings of localized device names
|
||||
"""
|
||||
for device in device_types.keys():
|
||||
device_types[device]["name"] = get_device_name(device)
|
||||
|
||||
return device_types
|
||||
|
||||
|
||||
# pylint: disable=too-many-return-statements
|
||||
def get_device_name(device_type):
|
||||
"""
|
||||
Takes a four letter device acronym (str) device_type.
|
||||
Returns the human-readable name for the device type.
|
||||
"""
|
||||
if device_type == "SAHD":
|
||||
return _("SASI Hard Disk")
|
||||
if device_type == "SCHD":
|
||||
return _("SCSI Hard Disk")
|
||||
if device_type == "SCRM":
|
||||
return _("Removable Disk")
|
||||
if device_type == "SCMO":
|
||||
return _("Magneto-Optical")
|
||||
if device_type == "SCCD":
|
||||
return _("CD / DVD")
|
||||
if device_type == "SCBR":
|
||||
return _("X68000 Host Bridge")
|
||||
if device_type == "SCDP":
|
||||
return _("DaynaPORT SCSI/Link")
|
||||
if device_type == "SCLP":
|
||||
return _("Printer")
|
||||
if device_type == "SCHS":
|
||||
return _("Host Services")
|
||||
return device_type
|
||||
|
||||
|
||||
def auth_active(group):
|
||||
"""
|
||||
Inspects if the group defined in (str) group exists on the system.
|
||||
If it exists, tell the webapp to enable authentication.
|
||||
Returns a (dict) with (bool) status and (str) msg
|
||||
"""
|
||||
groups = [g.gr_name for g in getgrall()]
|
||||
if group in groups:
|
||||
return {
|
||||
"status": True,
|
||||
"msg": _("You must log in to use this function"),
|
||||
}
|
||||
return {"status": False, "msg": ""}
|
||||
|
||||
|
||||
def is_bridge_configured(interface):
|
||||
"""
|
||||
Takes (str) interface of a network device being attached.
|
||||
Returns (bool) False if the network bridge is configured.
|
||||
Returns (str) with an error message if the network bridge is not configured.
|
||||
"""
|
||||
sys_cmd = SysCmds()
|
||||
if interface.startswith("wlan"):
|
||||
if not sys_cmd.introspect_file("/etc/sysctl.conf", r"^net\.ipv4\.ip_forward=1$"):
|
||||
return _("Configure IPv4 forwarding before using a wireless network device.")
|
||||
if not Path("/etc/iptables/rules.v4").is_file():
|
||||
return _("Configure NAT before using a wireless network device.")
|
||||
else:
|
||||
if not sys_cmd.introspect_file(
|
||||
"/etc/dhcpcd.conf",
|
||||
r"^denyinterfaces " + interface + r"$",
|
||||
):
|
||||
return _("Configure the network bridge before using a wired network device.")
|
||||
if not Path("/etc/network/interfaces.d/rascsi_bridge").is_file():
|
||||
return _("Configure the network bridge before using a wired network device.")
|
||||
return False
|
||||
|
||||
|
||||
def upload_with_dropzonejs(image_dir):
|
||||
"""
|
||||
Takes (str) image_dir which is the path to the image dir to store files.
|
||||
Opens a stream to transfer a file via the embedded dropzonejs library.
|
||||
"""
|
||||
log = logging.getLogger("pydrop")
|
||||
file_object = request.files["file"]
|
||||
file_name = secure_filename(file_object.filename)
|
||||
|
||||
save_path = path.join(image_dir, file_name)
|
||||
current_chunk = int(request.form['dzchunkindex'])
|
||||
|
||||
# Makes sure not to overwrite an existing file,
|
||||
# but continues writing to a file transfer in progress
|
||||
if path.exists(save_path) and current_chunk == 0:
|
||||
return make_response(_("The file already exists!"), 400)
|
||||
|
||||
try:
|
||||
with open(save_path, "ab") as save:
|
||||
save.seek(int(request.form["dzchunkbyteoffset"]))
|
||||
save.write(file_object.stream.read())
|
||||
except OSError:
|
||||
log.exception("Could not write to file")
|
||||
return make_response(_("Unable to write the file to disk!"), 500)
|
||||
|
||||
total_chunks = int(request.form["dztotalchunkcount"])
|
||||
|
||||
if current_chunk + 1 == total_chunks:
|
||||
# Validate the resulting file size after writing the last chunk
|
||||
if path.getsize(save_path) != int(request.form["dztotalfilesize"]):
|
||||
log.error(
|
||||
"Finished transferring %s, "
|
||||
"but it has a size mismatch with the original file. "
|
||||
"Got %s but we expected %s.",
|
||||
file_object.filename,
|
||||
path.getsize(save_path),
|
||||
request.form['dztotalfilesize'],
|
||||
)
|
||||
return make_response(_("Transferred file corrupted!"), 500)
|
||||
|
||||
log.info("File %s has been uploaded successfully", file_object.filename)
|
||||
log.debug("Chunk %s of %s for file %s completed.",
|
||||
current_chunk + 1, total_chunks, file_object.filename)
|
||||
|
||||
return make_response(_("File upload successful!"), 200)
|
Reference in New Issue
Block a user