2022-02-27 05:46:35 +00:00
|
|
|
"""
|
2022-12-05 17:58:23 +00:00
|
|
|
Module for PiSCSI Web Interface utility methods
|
2022-02-27 05:46:35 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
import logging
|
|
|
|
from grp import getgrall
|
|
|
|
from os import path
|
|
|
|
from pathlib import Path
|
2022-11-14 17:32:15 +00:00
|
|
|
from ua_parser import user_agent_parser
|
2022-02-27 05:46:35 +00:00
|
|
|
|
|
|
|
from flask import request, make_response
|
|
|
|
from flask_babel import _
|
|
|
|
from werkzeug.utils import secure_filename
|
|
|
|
|
2022-12-05 17:58:23 +00:00
|
|
|
from piscsi.sys_cmds import SysCmds
|
2022-02-27 05:46:35 +00:00
|
|
|
|
2022-11-30 05:19:17 +00:00
|
|
|
|
2022-02-27 05:46:35 +00:00
|
|
|
def get_valid_scsi_ids(devices, reserved_ids):
|
|
|
|
"""
|
|
|
|
Takes a list of (dict)s devices, and list of (int)s reserved_ids.
|
|
|
|
Returns:
|
|
|
|
- (list) of (int)s valid_ids, which are the SCSI ids that are not reserved
|
2022-10-06 17:00:57 +00:00
|
|
|
- (list) of (int)s occupied_ids, which are the SCSI ids were a device is attached
|
2022-02-27 05:46:35 +00:00
|
|
|
- (int) recommended_id, which is the id that the Web UI should default to recommend
|
|
|
|
"""
|
|
|
|
occupied_ids = []
|
|
|
|
for device in devices:
|
|
|
|
occupied_ids.append(device["id"])
|
|
|
|
|
|
|
|
unoccupied_ids = [i for i in list(range(8)) if i not in reserved_ids + occupied_ids]
|
|
|
|
unoccupied_ids.sort()
|
|
|
|
valid_ids = [i for i in list(range(8)) if i not in reserved_ids]
|
|
|
|
valid_ids.sort(reverse=True)
|
|
|
|
|
|
|
|
if unoccupied_ids:
|
|
|
|
recommended_id = unoccupied_ids[-1]
|
|
|
|
else:
|
2022-06-30 21:17:55 +00:00
|
|
|
if occupied_ids:
|
2022-10-06 17:00:57 +00:00
|
|
|
recommended_id = occupied_ids[0]
|
2022-06-30 21:17:55 +00:00
|
|
|
else:
|
|
|
|
recommended_id = 0
|
2022-02-27 05:46:35 +00:00
|
|
|
|
2022-10-06 17:00:57 +00:00
|
|
|
return {
|
|
|
|
"valid_ids": valid_ids,
|
|
|
|
"occupied_ids": occupied_ids,
|
|
|
|
"recommended_id": recommended_id,
|
2022-11-30 05:19:17 +00:00
|
|
|
}
|
2022-02-27 05:46:35 +00:00
|
|
|
|
|
|
|
|
|
|
|
def sort_and_format_devices(devices):
|
|
|
|
"""
|
|
|
|
Takes a (list) of (dict)s devices and returns a (list) of (dict)s.
|
|
|
|
Sorts by SCSI ID acending (0 to 7).
|
|
|
|
For SCSI IDs where no device is attached, inject a (dict) with placeholder text.
|
|
|
|
"""
|
|
|
|
occupied_ids = []
|
2022-10-03 18:47:08 +00:00
|
|
|
formatted_devices = []
|
2022-02-27 05:46:35 +00:00
|
|
|
for device in devices:
|
|
|
|
occupied_ids.append(device["id"])
|
2022-10-03 18:47:08 +00:00
|
|
|
device["device_name"] = get_device_name(device["device_type"])
|
|
|
|
formatted_devices.append(device)
|
|
|
|
|
|
|
|
# Add placeholder data for non-occupied IDs
|
|
|
|
for scsi_id in range(8):
|
|
|
|
if scsi_id not in occupied_ids:
|
|
|
|
formatted_devices.append(
|
|
|
|
{
|
|
|
|
"id": scsi_id,
|
|
|
|
"unit": "-",
|
|
|
|
"device_name": "-",
|
|
|
|
"status": "-",
|
|
|
|
"file": "-",
|
|
|
|
"product": "-",
|
|
|
|
}
|
|
|
|
)
|
2022-02-27 05:46:35 +00:00
|
|
|
|
|
|
|
return formatted_devices
|
|
|
|
|
|
|
|
|
|
|
|
def map_device_types_and_names(device_types):
|
|
|
|
"""
|
2022-12-05 17:58:23 +00:00
|
|
|
Takes a (dict) corresponding to the data structure returned by PiscsiCmds.get_device_types()
|
2022-02-27 05:46:35 +00:00
|
|
|
Returns a (dict) of device_type:device_name mappings of localized device names
|
|
|
|
"""
|
|
|
|
for device in device_types.keys():
|
|
|
|
device_types[device]["name"] = get_device_name(device)
|
|
|
|
|
|
|
|
return device_types
|
|
|
|
|
|
|
|
|
|
|
|
# pylint: disable=too-many-return-statements
|
|
|
|
def get_device_name(device_type):
|
|
|
|
"""
|
|
|
|
Takes a four letter device acronym (str) device_type.
|
|
|
|
Returns the human-readable name for the device type.
|
|
|
|
"""
|
|
|
|
if device_type == "SCHD":
|
2022-10-01 23:51:30 +00:00
|
|
|
return _("Hard Disk Drive")
|
2022-02-27 05:46:35 +00:00
|
|
|
if device_type == "SCRM":
|
2022-10-01 23:51:30 +00:00
|
|
|
return _("Removable Disk Drive")
|
2022-02-27 05:46:35 +00:00
|
|
|
if device_type == "SCMO":
|
2022-10-01 23:51:30 +00:00
|
|
|
return _("Magneto-Optical Drive")
|
2022-02-27 05:46:35 +00:00
|
|
|
if device_type == "SCCD":
|
2022-10-01 23:51:30 +00:00
|
|
|
return _("CD/DVD Drive")
|
2022-02-27 05:46:35 +00:00
|
|
|
if device_type == "SCBR":
|
2022-10-01 23:51:30 +00:00
|
|
|
return _("Host Bridge")
|
2022-02-27 05:46:35 +00:00
|
|
|
if device_type == "SCDP":
|
2022-10-03 18:47:08 +00:00
|
|
|
return _("Ethernet Adapter")
|
2022-02-27 05:46:35 +00:00
|
|
|
if device_type == "SCLP":
|
|
|
|
return _("Printer")
|
|
|
|
if device_type == "SCHS":
|
|
|
|
return _("Host Services")
|
|
|
|
return device_type
|
|
|
|
|
|
|
|
|
2022-09-22 00:14:53 +00:00
|
|
|
def map_image_file_descriptions(file_suffixes):
|
|
|
|
"""
|
|
|
|
Takes a (list) of (str) file suffixes for images file types.
|
|
|
|
Returns a (dict) with file suffix and description pairs, both (str)
|
|
|
|
"""
|
|
|
|
supported_image_types = {}
|
|
|
|
for suffix in file_suffixes:
|
|
|
|
supported_image_types[suffix] = get_image_description(suffix)
|
|
|
|
|
|
|
|
return supported_image_types
|
|
|
|
|
|
|
|
|
|
|
|
# pylint: disable=too-many-return-statements
|
|
|
|
def get_image_description(file_suffix):
|
|
|
|
"""
|
|
|
|
Takes a three char file suffix (str) file_suffix.
|
|
|
|
Returns the help text description for said file suffix.
|
|
|
|
"""
|
|
|
|
if file_suffix == "hds":
|
|
|
|
return _("Hard Disk Image (Generic)")
|
|
|
|
if file_suffix == "hda":
|
|
|
|
return _("Hard Disk Image (Apple)")
|
|
|
|
if file_suffix == "hdn":
|
|
|
|
return _("Hard Disk Image (NEC)")
|
|
|
|
if file_suffix == "hd1":
|
|
|
|
return _("Hard Disk Image (SCSI-1)")
|
|
|
|
if file_suffix == "hdr":
|
|
|
|
return _("Removable Disk Image")
|
|
|
|
if file_suffix == "mos":
|
|
|
|
return _("Magneto-Optical Disk Image")
|
|
|
|
return file_suffix
|
|
|
|
|
|
|
|
|
2022-10-01 23:51:30 +00:00
|
|
|
def format_drive_properties(drive_properties):
|
|
|
|
"""
|
|
|
|
Takes a (dict) with structured drive properties data
|
|
|
|
Returns a (dict) with the formatted properties, one (list) per device type
|
|
|
|
"""
|
|
|
|
hd_conf = []
|
|
|
|
cd_conf = []
|
|
|
|
rm_conf = []
|
|
|
|
mo_conf = []
|
|
|
|
|
|
|
|
for device in drive_properties:
|
2022-10-25 15:51:04 +00:00
|
|
|
# Fallback for when the properties data is corrupted, to avoid crashing the web app.
|
|
|
|
# The integration tests will catch this scenario, but relies on the web app not crashing.
|
|
|
|
if not device.get("name"):
|
|
|
|
device["name"] = ""
|
|
|
|
|
|
|
|
device["secure_name"] = secure_filename(device["name"])
|
|
|
|
|
|
|
|
if device.get("size"):
|
|
|
|
device["size_mb"] = f'{device["size"] / 1024 / 1024:,.2f}'
|
|
|
|
|
2022-10-01 23:51:30 +00:00
|
|
|
if device["device_type"] == "SCHD":
|
|
|
|
hd_conf.append(device)
|
|
|
|
elif device["device_type"] == "SCCD":
|
|
|
|
cd_conf.append(device)
|
|
|
|
elif device["device_type"] == "SCRM":
|
|
|
|
rm_conf.append(device)
|
|
|
|
elif device["device_type"] == "SCMO":
|
|
|
|
mo_conf.append(device)
|
|
|
|
|
|
|
|
return {
|
|
|
|
"hd_conf": hd_conf,
|
|
|
|
"cd_conf": cd_conf,
|
|
|
|
"rm_conf": rm_conf,
|
|
|
|
"mo_conf": mo_conf,
|
2022-11-30 05:19:17 +00:00
|
|
|
}
|
|
|
|
|
2022-10-01 23:51:30 +00:00
|
|
|
|
2022-10-06 21:04:41 +00:00
|
|
|
def get_properties_by_drive_name(drives, drive_name):
|
|
|
|
"""
|
|
|
|
Takes (list) of (dict) drives, and (str) drive_name
|
|
|
|
Returns (dict) with the collection of drive properties that matches drive_name
|
|
|
|
"""
|
|
|
|
drives.sort(key=lambda item: item.get("name"))
|
|
|
|
|
|
|
|
for drive in drives:
|
|
|
|
if drive["name"] == drive_name:
|
2022-11-16 06:28:27 +00:00
|
|
|
return {
|
|
|
|
"file_type": drive["file_type"],
|
|
|
|
"vendor": drive["vendor"],
|
|
|
|
"product": drive["product"],
|
|
|
|
"revision": drive["revision"],
|
|
|
|
"block_size": drive["block_size"],
|
|
|
|
"size": drive["size"],
|
2022-11-30 05:19:17 +00:00
|
|
|
}
|
2022-10-06 21:04:41 +00:00
|
|
|
|
2022-11-16 06:28:27 +00:00
|
|
|
logging.error("Properties for drive '%s' does not exist in database", drive_name)
|
|
|
|
return False
|
2022-10-06 21:04:41 +00:00
|
|
|
|
2022-11-30 05:19:17 +00:00
|
|
|
|
2022-02-27 05:46:35 +00:00
|
|
|
def auth_active(group):
|
|
|
|
"""
|
|
|
|
Inspects if the group defined in (str) group exists on the system.
|
|
|
|
If it exists, tell the webapp to enable authentication.
|
|
|
|
Returns a (dict) with (bool) status and (str) msg
|
|
|
|
"""
|
|
|
|
groups = [g.gr_name for g in getgrall()]
|
|
|
|
if group in groups:
|
|
|
|
return {
|
2022-11-30 05:19:17 +00:00
|
|
|
"status": True,
|
|
|
|
"msg": _("You must log in to use this function"),
|
|
|
|
}
|
2022-02-27 05:46:35 +00:00
|
|
|
return {"status": False, "msg": ""}
|
|
|
|
|
|
|
|
|
|
|
|
def is_bridge_configured(interface):
|
|
|
|
"""
|
|
|
|
Takes (str) interface of a network device being attached.
|
2022-10-05 21:14:48 +00:00
|
|
|
Returns a (dict) with (bool) status and (str) msg
|
2022-02-27 05:46:35 +00:00
|
|
|
"""
|
2022-10-24 02:05:29 +00:00
|
|
|
PATH_SYSCTL = "/etc/sysctl.conf"
|
|
|
|
PATH_IPTV4 = "/etc/iptables/rules.v4"
|
|
|
|
PATH_DHCPCD = "/etc/dhcpcd.conf"
|
2022-12-05 17:58:23 +00:00
|
|
|
PATH_BRIDGE = "/etc/network/interfaces.d/piscsi_bridge"
|
2022-10-24 02:05:29 +00:00
|
|
|
return_msg = _("Configure the network bridge for %(interface)s first: ", interface=interface)
|
|
|
|
to_configure = []
|
2022-02-27 05:46:35 +00:00
|
|
|
sys_cmd = SysCmds()
|
|
|
|
if interface.startswith("wlan"):
|
2022-10-24 02:05:29 +00:00
|
|
|
if not sys_cmd.introspect_file(PATH_SYSCTL, r"^net\.ipv4\.ip_forward=1$"):
|
|
|
|
to_configure.append("IPv4 forwarding")
|
|
|
|
if not Path(PATH_IPTV4).is_file():
|
|
|
|
to_configure.append("NAT")
|
2022-02-27 05:46:35 +00:00
|
|
|
else:
|
2022-10-24 02:05:29 +00:00
|
|
|
if not sys_cmd.introspect_file(PATH_DHCPCD, r"^denyinterfaces " + interface + r"$"):
|
|
|
|
to_configure.append(PATH_DHCPCD)
|
|
|
|
if not Path(PATH_BRIDGE).is_file():
|
|
|
|
to_configure.append(PATH_BRIDGE)
|
|
|
|
|
|
|
|
if to_configure:
|
|
|
|
return {"status": False, "msg": return_msg + ", ".join(to_configure)}
|
|
|
|
|
|
|
|
return {"status": True, "msg": ""}
|
|
|
|
|
|
|
|
|
|
|
|
def is_safe_path(file_name):
|
|
|
|
"""
|
|
|
|
Takes (Path) file_name with the path to a file on the file system
|
|
|
|
Returns True if the path is safe
|
|
|
|
Returns False if the path is either absolute, or tries to traverse the file system
|
|
|
|
"""
|
|
|
|
if file_name.is_absolute() or ".." in str(file_name):
|
|
|
|
return {
|
|
|
|
"status": False,
|
|
|
|
"msg": _("%(file_name)s is not a valid path", file_name=file_name),
|
|
|
|
}
|
|
|
|
|
|
|
|
return {"status": True, "msg": ""}
|
2022-02-27 05:46:35 +00:00
|
|
|
|
|
|
|
|
|
|
|
def upload_with_dropzonejs(image_dir):
|
|
|
|
"""
|
|
|
|
Takes (str) image_dir which is the path to the image dir to store files.
|
|
|
|
Opens a stream to transfer a file via the embedded dropzonejs library.
|
|
|
|
"""
|
|
|
|
log = logging.getLogger("pydrop")
|
|
|
|
file_object = request.files["file"]
|
|
|
|
file_name = secure_filename(file_object.filename)
|
|
|
|
|
|
|
|
save_path = path.join(image_dir, file_name)
|
2022-11-30 05:19:17 +00:00
|
|
|
current_chunk = int(request.form["dzchunkindex"])
|
2022-02-27 05:46:35 +00:00
|
|
|
|
|
|
|
# Makes sure not to overwrite an existing file,
|
|
|
|
# but continues writing to a file transfer in progress
|
|
|
|
if path.exists(save_path) and current_chunk == 0:
|
|
|
|
return make_response(_("The file already exists!"), 400)
|
|
|
|
|
|
|
|
try:
|
|
|
|
with open(save_path, "ab") as save:
|
|
|
|
save.seek(int(request.form["dzchunkbyteoffset"]))
|
|
|
|
save.write(file_object.stream.read())
|
|
|
|
except OSError:
|
|
|
|
log.exception("Could not write to file")
|
|
|
|
return make_response(_("Unable to write the file to disk!"), 500)
|
|
|
|
|
|
|
|
total_chunks = int(request.form["dztotalchunkcount"])
|
|
|
|
|
|
|
|
if current_chunk + 1 == total_chunks:
|
|
|
|
# Validate the resulting file size after writing the last chunk
|
|
|
|
if path.getsize(save_path) != int(request.form["dztotalfilesize"]):
|
2022-10-09 20:50:20 +00:00
|
|
|
log.error("File size mismatch between the original file and transferred file.")
|
2022-02-27 05:46:35 +00:00
|
|
|
return make_response(_("Transferred file corrupted!"), 500)
|
|
|
|
|
|
|
|
return make_response(_("File upload successful!"), 200)
|
2022-11-14 17:32:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
def browser_supports_modern_themes():
|
|
|
|
"""
|
|
|
|
Determines if the browser supports the HTML/CSS/JS features used in non-legacy themes.
|
|
|
|
"""
|
|
|
|
user_agent_string = request.headers.get("User-Agent")
|
|
|
|
if not user_agent_string:
|
|
|
|
return False
|
|
|
|
|
|
|
|
user_agent = user_agent_parser.Parse(user_agent_string)
|
2022-11-30 05:19:17 +00:00
|
|
|
if not user_agent["user_agent"]["family"]:
|
2022-11-14 17:32:15 +00:00
|
|
|
return False
|
|
|
|
|
|
|
|
# (family, minimum version)
|
|
|
|
supported_browsers = [
|
2022-11-30 05:19:17 +00:00
|
|
|
("Safari", 14),
|
|
|
|
("Chrome", 100),
|
|
|
|
("Firefox", 100),
|
|
|
|
("Edge", 100),
|
|
|
|
("Mobile Safari", 14),
|
|
|
|
("Chrome Mobile", 100),
|
2022-11-14 17:32:15 +00:00
|
|
|
]
|
|
|
|
|
2022-11-30 05:19:17 +00:00
|
|
|
current_ua_family = user_agent["user_agent"]["family"]
|
2022-12-04 14:31:57 +00:00
|
|
|
current_ua_version = user_agent["user_agent"]["major"]
|
2022-11-14 17:32:15 +00:00
|
|
|
logging.info(f"Identified browser as family={current_ua_family}, version={current_ua_version}")
|
|
|
|
|
2022-12-04 14:31:57 +00:00
|
|
|
# Supported browsers cannot be identified without a version
|
|
|
|
if not current_ua_version:
|
|
|
|
return False
|
|
|
|
|
2022-11-14 17:32:15 +00:00
|
|
|
for supported_browser, supported_version in supported_browsers:
|
2022-12-04 14:31:57 +00:00
|
|
|
if (
|
|
|
|
current_ua_family == supported_browser
|
|
|
|
and float(current_ua_version) >= supported_version
|
|
|
|
):
|
2022-11-14 17:32:15 +00:00
|
|
|
return True
|
|
|
|
return False
|