Web UI: More file path sanitation, better network bridge warnings, each endpoint return one message (#932)

- Sanitize file paths with Path: for flat file structures, always extract Path().name, and for nested file structures either look for absolute paths, or someone trying to use ".." to traverse the dir strucutre.
- Reduce redundancy in network bridge detection method, and return somewhat more informative messages
- Make all endpoints return exactly one message
- Move some warning messages to logging
- Use tempfile for iso generation temp file handling
This commit is contained in:
Daniel Markstedt 2022-10-23 19:05:29 -07:00 committed by GitHub
parent f3553c5480
commit 5172d167e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 323 additions and 233 deletions

View File

@ -2,17 +2,17 @@
Module for methods reading from and writing to the file system
"""
import os
import logging
import asyncio
from os import path, walk
from functools import lru_cache
from pathlib import PurePath, Path
from zipfile import ZipFile, is_zipfile
from time import time
from subprocess import run, CalledProcessError
from json import dump, load
from shutil import copyfile
from urllib.parse import quote
from tempfile import TemporaryDirectory
import requests
@ -54,14 +54,14 @@ class FileCmds:
index 0 is (str) file name and index 1 is (int) size in bytes
"""
files_list = []
for path, _dirs, files in os.walk(dir_path):
for file_path, _dirs, files in walk(dir_path):
# Only list selected file types
files = [f for f in files if f.lower().endswith(file_types)]
files_list.extend(
[
(
file,
os.path.getsize(os.path.join(path, file))
path.getsize(path.join(file_path, file))
)
for file in files
]
@ -75,7 +75,7 @@ class FileCmds:
Returns a (list) of (str) files_list
"""
files_list = []
for _root, _dirs, files in os.walk(CFG_DIR):
for _root, _dirs, files in walk(CFG_DIR):
for file in files:
if file.endswith("." + CONFIG_FILE_SUFFIX):
files_list.append(file)
@ -375,63 +375,61 @@ class FileCmds:
server_info = self.ractl.get_server_info()
file_name = PurePath(url).name
tmp_ts = int(time())
tmp_dir = "/tmp/" + str(tmp_ts) + "/"
os.mkdir(tmp_dir)
tmp_full_path = tmp_dir + file_name
iso_filename = f"{server_info['image_dir']}/{file_name}.iso"
iso_filename = Path(server_info['image_dir']) / f"{file_name}.iso"
req_proc = self.download_to_dir(quote(url, safe=URL_SAFE), tmp_dir, file_name)
with TemporaryDirectory() as tmp_dir:
req_proc = self.download_to_dir(quote(url, safe=URL_SAFE), tmp_dir, file_name)
logging.info("Downloaded %s to %s", file_name, tmp_dir)
if not req_proc["status"]:
return {"status": False, "msg": req_proc["msg"]}
if not req_proc["status"]:
return {"status": False, "msg": req_proc["msg"]}
if is_zipfile(tmp_full_path):
if "XtraStuf.mac" in str(ZipFile(tmp_full_path).namelist()):
logging.info("MacZip file format detected. Will not unzip to retain resource fork.")
else:
logging.info(
"%s is a zipfile! Will attempt to unzip and store the resulting files.",
tmp_full_path,
)
unzip_proc = asyncio.run(self.run_async("unzip", [
"-d",
tmp_dir,
"-n",
tmp_full_path,
]))
if not unzip_proc["returncode"]:
tmp_full_path = Path(tmp_dir) / file_name
if is_zipfile(tmp_full_path):
if "XtraStuf.mac" in str(ZipFile(str(tmp_full_path)).namelist()):
logging.info("MacZip file format detected. Will not unzip to retain resource fork.")
else:
logging.info(
"%s was successfully unzipped. Deleting the zipfile.",
"%s is a zipfile! Will attempt to unzip and store the resulting files.",
tmp_full_path,
)
self.delete_file(Path(tmp_full_path))
unzip_proc = asyncio.run(self.run_async("unzip", [
"-d",
str(tmp_dir),
"-n",
str(tmp_full_path),
]))
if not unzip_proc["returncode"]:
logging.info(
"%s was successfully unzipped. Deleting the zipfile.",
tmp_full_path,
)
tmp_full_path.unlink(True)
try:
run(
[
"genisoimage",
*iso_args,
"-o",
iso_filename,
tmp_dir,
],
capture_output=True,
check=True,
)
except CalledProcessError as error:
logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8"))
return {"status": False, "msg": error.stderr.decode("utf-8")}
try:
run(
[
"genisoimage",
*iso_args,
"-o",
str(iso_filename),
tmp_dir,
],
capture_output=True,
check=True,
)
except CalledProcessError as error:
logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8"))
return {"status": False, "msg": error.stderr.decode("utf-8")}
parameters = {
"value": " ".join(iso_args)
}
return {
"status": True,
"return_code": ReturnCodes.DOWNLOADFILETOISO_SUCCESS,
"parameters": parameters,
"file_name": iso_filename,
}
parameters = {
"value": " ".join(iso_args)
}
return {
"status": True,
"return_code": ReturnCodes.DOWNLOADFILETOISO_SUCCESS,
"parameters": parameters,
"file_name": iso_filename.name,
}
# noinspection PyMethodMayBeStatic
def download_to_dir(self, url, save_dir, file_name):

View File

@ -35,7 +35,7 @@
<p>
<form action="/config/save" method="post">
<label for="config_save_name">{{ _("File name") }}</label>
<input name="name" id="config_save_name" placeholder="default" size="20">
<input name="name" id="config_save_name" value="default" size="20">
.{{ CONFIG_FILE_SUFFIX }}
<input type="submit" value="{{ _("Save") }}">
</form>
@ -522,22 +522,22 @@
<input name="url" id="iso_url" required="" type="url">
<label for="iso_type">{{ _("Type:") }}</label>
<select name="type" id="iso_type">
<option value="-hfs">
<option value="HFS">
HFS
</option>
<option value="-iso-level 1">
<option value="ISO-9660 Level 1">
ISO-9660 Level 1
</option>
<option value="-iso-level 2">
<option value="ISO-9660 Level 2">
ISO-9660 Level 2
</option>
<option value="-iso-level 3">
<option value="ISO-9660 Level 3">
ISO-9660 Level 3
</option>
<option value="-J">
<option value="Joliet">
Joliet
</option>
<option value="-r">
<option value="Rock Ridge">
Rock Ridge
</option>
</select>

View File

@ -54,6 +54,7 @@ from web_utils import (
get_properties_by_drive_name,
auth_active,
is_bridge_configured,
is_safe_path,
upload_with_dropzonejs,
)
from settings import (
@ -336,7 +337,7 @@ def drive_create():
Creates the image and properties file pair
"""
drive_name = request.form.get("drive_name")
file_name = request.form.get("file_name")
file_name = Path(request.form.get("file_name")).name
properties = get_properties_by_drive_name(
APP.config["RASCSI_DRIVE_PROPERTIES"],
@ -360,11 +361,9 @@ def drive_create():
if not process["status"]:
return response(error=True, message=process["msg"])
# TODO: Refactor the return messages into one string
return response(message=[
(_("Image file created: %(file_name)s", file_name=full_file_name), "success"),
(process["msg"], "success"),
])
return response(message=
_("Image file with properties created: %(file_name)s", file_name=full_file_name)
)
@APP.route("/drive/cdrom", methods=["POST"])
@ -374,7 +373,7 @@ def drive_cdrom():
Creates a properties file for a CD-ROM image
"""
drive_name = request.form.get("drive_name")
file_name = request.form.get("file_name")
file_name = Path(request.form.get("file_name")).name
# Creating the drive properties file
file_name = f"{file_name}.{PROPERTIES_SUFFIX}"
@ -396,8 +395,7 @@ def config_save():
"""
Saves a config file to disk
"""
file_name = request.form.get("name") or "default"
file_name = f"{file_name}.{CONFIG_FILE_SUFFIX}"
file_name = Path(request.form.get("name") + f".{CONFIG_FILE_SUFFIX}").name
process = file_cmd.write_config(file_name)
process = ReturnCodeMapper.add_msg(process)
@ -413,7 +411,7 @@ def config_load():
"""
Loads a config file from disk
"""
file_name = request.form.get("name")
file_name = Path(request.form.get("name")).name
if "load" in request.form:
process = file_cmd.read_config(file_name)
@ -440,18 +438,18 @@ def show_diskinfo():
"""
Displays disk image info
"""
file_name = request.form.get("file_name")
file_name = Path(request.form.get("file_name"))
safe_path = is_safe_path(file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
server_info = ractl_cmd.get_server_info()
returncode, diskinfo = sys_cmd.get_diskinfo(
server_info["image_dir"] +
"/" +
file_name
Path(server_info["image_dir"]) / file_name
)
if returncode == 0:
return response(
template="diskinfo.html",
file_name=file_name,
file_name=str(file_name),
diskinfo=diskinfo,
)
@ -580,17 +578,10 @@ def attach_device():
if param:
params.update({item.replace(PARAM_PREFIX, ""): param})
error_url = "https://github.com/akuker/RASCSI/wiki/Dayna-Port-SCSI-Link"
error_msg = _("Please follow the instructions at %(url)s", url=error_url)
if "interface" in params.keys():
bridge_status = is_bridge_configured(params["interface"])
if not bridge_status["status"]:
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(bridge_status["msg"], "error"),
(error_msg, "error")
])
return response(error=True, message=bridge_status["msg"])
kwargs = {
"unit": int(unit),
@ -658,32 +649,26 @@ def attach_image():
process = ractl_cmd.attach_device(scsi_id, **kwargs)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
response_messages = [(_(
"Attached %(file_name)s as %(device_type)s to "
"SCSI ID %(id_number)s LUN %(unit_number)s",
file_name=file_name,
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
), "success")]
if int(file_size) % int(expected_block_size):
response_messages.append((_(
"The image file size %(file_size)s bytes is not a multiple of "
"%(block_size)s. RaSCSI will ignore the trailing data. "
logging.warning(
"The image file size %s bytes is not a multiple of %s. "
"RaSCSI will ignore the trailing data. "
"The image may be corrupted, so proceed with caution.",
file_size=file_size,
block_size=expected_block_size,
), "warning"))
file_size,
expected_block_size,
)
return response(
message=_(
"Attached %(file_name)s as %(device_type)s to "
"SCSI ID %(id_number)s LUN %(unit_number)s",
file_name=file_name,
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
)
)
return response(message=response_messages)
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(_("Failed to attach %(file_name)s to SCSI ID %(id_number)s LUN %(unit_number)s",
file_name=file_name, id_number=scsi_id, unit_number=unit), "error"),
(process["msg"], "error"),
])
return response(error=True, message=process["msg"])
@APP.route("/scsi/detach_all", methods=["POST"])
@ -712,12 +697,7 @@ def detach():
return response(message=_("Detached SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit))
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(_("Failed to detach SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit), "error"),
(process["msg"], "error"),
])
return response(error=True, message=process["msg"])
@APP.route("/scsi/eject", methods=["POST"])
@ -734,12 +714,7 @@ def eject():
return response(message=_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit))
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(_("Failed to eject SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit), "error"),
(process["msg"], "error"),
])
return response(error=True, message=process["msg"])
@APP.route("/scsi/info", methods=["POST"])
@ -772,11 +747,7 @@ def reserve_id():
RESERVATIONS[int(scsi_id)] = memo
return response(message=_("Reserved SCSI ID %(id_number)s", id_number=scsi_id))
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(_("Failed to reserve SCSI ID %(id_number)s", id_number=scsi_id), "error"),
(process["msg"], "error"),
])
return response(error=True, message=process["msg"])
@APP.route("/scsi/release", methods=["POST"])
@ -793,11 +764,7 @@ def release_id():
RESERVATIONS[int(scsi_id)] = ""
return response(message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id))
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(_("Failed to release the reservation for SCSI ID %(id_number)s", id_number=scsi_id), "error"),
(process["msg"], "error"),
])
return response(error=True, message=process["msg"])
@APP.route("/pi/reboot", methods=["POST"])
@ -824,24 +791,40 @@ def shutdown():
@login_required
def download_to_iso():
"""
Downloads a remote file and creates a CD-ROM image formatted with HFS that contains the file
Downloads a file and creates a CD-ROM image with the specified file system and the file
"""
scsi_id = request.form.get("scsi_id")
url = request.form.get("url")
iso_args = request.form.get("type").split()
response_messages = []
iso_type = request.form.get("type")
if iso_type == "HFS":
iso_args = ["-hfs"]
elif iso_type == "ISO-9660 Level 1":
iso_args = ["-iso-level", "1"]
elif iso_type == "ISO-9660 Level 2":
iso_args = ["-iso-level", "2"]
elif iso_type == "ISO-9660 Level 3":
iso_args = ["-iso-level", "3"]
elif iso_type == "Joliet":
iso_args = ["-J"]
elif iso_type == "Rock Ridge":
iso_args = ["-r"]
else:
return response(
error=True,
message=_("%(iso_type)s is not a valid CD-ROM format.", iso_type=iso_type)
)
process = file_cmd.download_file_to_iso(url, *iso_args)
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(_("Failed to create CD-ROM image from %(url)s", url=url), "error"),
(process["msg"], "error"),
])
response_messages.append((process["msg"], "success"))
response_messages.append((_("Saved image as: %(file_name)s", file_name=process['file_name']), "success"))
return response(
error=True,
message=_(
"The following error occurred when creating the CD-ROM image: %(error)s",
error=process["msg"],
),
)
process_attach = ractl_cmd.attach_device(
scsi_id,
@ -850,14 +833,26 @@ def download_to_iso():
)
process_attach = ReturnCodeMapper.add_msg(process_attach)
if process_attach["status"]:
response_messages.append((_("Attached to SCSI ID %(id_number)s", id_number=scsi_id), "success"))
return response(message=response_messages)
return response(
message=_(
"CD-ROM image %(file_name)s with type %(iso_type)s was created "
"and attached to SCSI ID %(id_number)s",
file_name=process["file_name"],
iso_type=iso_type,
id_number=scsi_id,
),
)
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(_("Failed to attach image to SCSI ID %(id_number)s. Try attaching it manually.", id_number=scsi_id), "error"),
(process_attach["msg"], "error"),
])
return response(
error=True,
message=_(
"CD-ROM image %(file_name)s with type %(iso_type)s was created "
"but could not be attached: %(error)s",
file_name=process["file_name"],
iso_type=iso_type,
error=process_attach["msg"],
),
)
@APP.route("/files/download_url", methods=["POST"])
@ -878,11 +873,13 @@ def download_file():
if process["status"]:
return response(message=process["msg"])
# TODO: Refactor the return messages into one string
return response(error=True, message=[
(_("Failed to download file from %(url)s", url=url), "error"),
(process["msg"], "error"),
])
return response(
error=True,
message=_(
"The following error occurred when downloading: %(error)s",
error=process["msg"],
),
)
@APP.route("/files/upload", methods=["POST"])
@ -911,13 +908,16 @@ def create_file():
"""
Creates an empty image file in the images dir
"""
file_name = request.form.get("file_name")
file_name = Path(request.form.get("file_name"))
size = (int(request.form.get("size")) * 1024 * 1024)
file_type = request.form.get("type")
drive_name = request.form.get("drive_name")
full_file_name = file_name + "." + file_type
process = file_cmd.create_new_image(file_name, file_type, size)
safe_path = is_safe_path(file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
full_file_name = f"{file_name}.{file_type}"
process = file_cmd.create_new_image(str(file_name), file_type, size)
if not process["status"]:
return response(error=True, message=process["msg"])
@ -933,15 +933,20 @@ def create_file():
if not process["status"]:
return response(error=True, message=process["msg"])
return response(
status_code=201,
message=_(
"Image file with properties created: %(file_name)s",
file_name=full_file_name,
),
image=full_file_name,
)
return response(
status_code=201,
# TODO: Refactor the return messages into one string
message=[
(_("Image file created: %(file_name)s", file_name=full_file_name), "success"),
(process["msg"], "success"),
],
message=_("Image file created: %(file_name)s", file_name=full_file_name),
image=full_file_name,
)
)
@APP.route("/files/download", methods=["POST"])
@ -950,9 +955,12 @@ def download():
"""
Downloads a file from the Pi to the local computer
"""
file_name = request.form.get("file")
file_name = Path(request.form.get("file"))
safe_path = is_safe_path(file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
server_info = ractl_cmd.get_server_info()
return send_from_directory(server_info["image_dir"], file_name, as_attachment=True)
return send_from_directory(server_info["image_dir"], str(file_name), as_attachment=True)
@APP.route("/files/delete", methods=["POST"])
@ -961,26 +969,35 @@ def delete():
"""
Deletes a specified file in the images dir
"""
file_name = request.form.get("file_name")
process = file_cmd.delete_image(file_name)
file_name = Path(request.form.get("file_name"))
safe_path = is_safe_path(file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
process = file_cmd.delete_image(str(file_name))
if not process["status"]:
return response(error=True, message=process["msg"])
response_messages = [
(_("Image file deleted: %(file_name)s", file_name=file_name), "success")]
# Delete the drive properties file, if it exists
prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}"
if prop_file_path.is_file():
process = file_cmd.delete_file(prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
response_messages.append((process["msg"], "success"))
return response(
message=_(
"Image file with properties deleted: %(file_name)s",
file_name=str(file_name),
),
)
else:
response_messages.append((process["msg"], "error"))
return response(error=True, message=process["msg"])
return response(message=response_messages)
return response(
message=_(
"Image file deleted: %(file_name)s",
file_name=str(file_name),
),
)
@APP.route("/files/rename", methods=["POST"])
@ -989,16 +1006,18 @@ def rename():
"""
Renames a specified file in the images dir
"""
file_name = request.form.get("file_name")
new_file_name = request.form.get("new_file_name")
process = file_cmd.rename_image(file_name, new_file_name)
file_name = Path(request.form.get("file_name"))
new_file_name = Path(request.form.get("new_file_name"))
safe_path = is_safe_path(file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
safe_path = is_safe_path(new_file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
process = file_cmd.rename_image(str(file_name), str(new_file_name))
if not process["status"]:
return response(error=True, message=process["msg"])
response_messages = [
(_("Image file renamed to: %(file_name)s", file_name=new_file_name), "success")]
# Rename the drive properties file, if it exists
prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}"
new_prop_file_path = Path(CFG_DIR) / f"{new_file_name}.{PROPERTIES_SUFFIX}"
@ -1006,11 +1025,21 @@ def rename():
process = file_cmd.rename_file(prop_file_path, new_prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
response_messages.append((process["msg"], "success"))
return response(
message=_(
"Image file with properties renamed to: %(file_name)s",
file_name=str(new_file_name),
),
)
else:
response_messages.append((process["msg"], "error"))
return response(error=True, message=process["msg"])
return response(message=response_messages)
return response(
message=_(
"Image file renamed to: %(file_name)s",
file_name=str(new_file_name),
),
)
@APP.route("/files/copy", methods=["POST"])
@ -1019,16 +1048,18 @@ def copy():
"""
Creates a copy of a specified file in the images dir
"""
file_name = request.form.get("file_name")
new_file_name = request.form.get("copy_file_name")
process = file_cmd.copy_image(file_name, new_file_name)
file_name = Path(request.form.get("file_name"))
new_file_name = Path(request.form.get("copy_file_name"))
safe_path = is_safe_path(file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
safe_path = is_safe_path(new_file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
process = file_cmd.copy_image(str(file_name), str(new_file_name))
if not process["status"]:
return response(error=True, message=process["msg"])
response_messages = [
(_("Copy of image file saved as: %(file_name)s", file_name=new_file_name), "success")]
# Create a copy of the drive properties file, if it exists
prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}"
new_prop_file_path = Path(CFG_DIR) / f"{new_file_name}.{PROPERTIES_SUFFIX}"
@ -1036,11 +1067,21 @@ def copy():
process = file_cmd.copy_file(prop_file_path, new_prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
response_messages.append((process["msg"], "success"))
return response(
message=_(
"Copy of image file with properties saved as: %(file_name)s",
file_name=str(new_file_name),
),
)
else:
response_messages.append((process["msg"], "error"))
return response(error=True, message=process["msg"])
return response(message=response_messages)
return response(
message=_(
"Copy of image file saved as: %(file_name)s",
file_name=str(new_file_name),
),
)
@APP.route("/files/extract_image", methods=["POST"])
@ -1049,31 +1090,34 @@ def extract_image():
"""
Extracts all or a subset of files in the specified archive
"""
archive_file = request.form.get("archive_file")
archive_file = Path(request.form.get("archive_file"))
archive_members_raw = request.form.get("archive_members") or None
archive_members = archive_members_raw.split("|") if archive_members_raw else None
safe_path = is_safe_path(archive_file)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
extract_result = file_cmd.extract_image(
archive_file,
str(archive_file),
archive_members
)
if extract_result["return_code"] == ReturnCodes.EXTRACTIMAGE_SUCCESS:
response_messages = [(ReturnCodeMapper.add_msg(extract_result).get("msg"), "success")]
for properties_file in extract_result["properties_files_moved"]:
if properties_file["status"]:
response_messages.append((_("Properties file %(file)s moved to %(directory)s",
file=properties_file['name'],
directory=CFG_DIR
), "success"))
logging.info(
"Properties file %s moved to %s",
properties_file["name"],
CFG_DIR,
)
else:
response_messages.append((_("Failed to move properties file %(file)s to %(directory)s",
file=properties_file['name'],
directory=CFG_DIR
), "error"))
logging.warning(
"Failed to move properties file %s to %s",
properties_file["name"],
CFG_DIR,
)
return response(message=response_messages)
return response(message=ReturnCodeMapper.add_msg(extract_result).get("msg"))
return response(error=True, message=ReturnCodeMapper.add_msg(extract_result).get("msg"))

View File

@ -240,29 +240,43 @@ def is_bridge_configured(interface):
Takes (str) interface of a network device being attached.
Returns a (dict) with (bool) status and (str) msg
"""
# TODO: Reduce the nesting of these checks, and streamline how the results are notified
status = True
return_msg = ""
PATH_SYSCTL = "/etc/sysctl.conf"
PATH_IPTV4 = "/etc/iptables/rules.v4"
PATH_DHCPCD = "/etc/dhcpcd.conf"
PATH_BRIDGE = "/etc/network/interfaces.d/rascsi_bridge"
return_msg = _("Configure the network bridge for %(interface)s first: ", interface=interface)
to_configure = []
sys_cmd = SysCmds()
if interface.startswith("wlan"):
if not sys_cmd.introspect_file("/etc/sysctl.conf", r"^net\.ipv4\.ip_forward=1$"):
status = False
return_msg = _("Configure IPv4 forwarding before using a wireless network device.")
elif not Path("/etc/iptables/rules.v4").is_file():
status = False
return_msg = _("Configure NAT before using a wireless network device.")
if not sys_cmd.introspect_file(PATH_SYSCTL, r"^net\.ipv4\.ip_forward=1$"):
to_configure.append("IPv4 forwarding")
if not Path(PATH_IPTV4).is_file():
to_configure.append("NAT")
else:
if not sys_cmd.introspect_file(
"/etc/dhcpcd.conf",
r"^denyinterfaces " + interface + r"$",
):
status = False
return_msg = _("Configure the network bridge before using a wired network device.")
elif not Path("/etc/network/interfaces.d/rascsi_bridge").is_file():
status = False
return_msg = _("Configure the network bridge before using a wired network device.")
if not sys_cmd.introspect_file(PATH_DHCPCD, r"^denyinterfaces " + interface + r"$"):
to_configure.append(PATH_DHCPCD)
if not Path(PATH_BRIDGE).is_file():
to_configure.append(PATH_BRIDGE)
return {"status": status, "msg": return_msg + f" ({interface})"}
if to_configure:
return {"status": False, "msg": return_msg + ", ".join(to_configure)}
return {"status": True, "msg": ""}
def is_safe_path(file_name):
"""
Takes (Path) file_name with the path to a file on the file system
Returns True if the path is safe
Returns False if the path is either absolute, or tries to traverse the file system
"""
if file_name.is_absolute() or ".." in str(file_name):
return {
"status": False,
"msg": _("%(file_name)s is not a valid path", file_name=file_name),
}
return {"status": True, "msg": ""}
def upload_with_dropzonejs(image_dir):

View File

@ -20,7 +20,6 @@ def test_create_file(http_client, list_files, delete_file):
"file_name": file_prefix,
"type": "hds",
"size": 1,
"drive_name": "DEC RZ22",
},
)
@ -36,6 +35,36 @@ def test_create_file(http_client, list_files, delete_file):
delete_file(file_name)
# route("/files/create", methods=["POST"])
def test_create_file_with_properties(http_client, list_files, delete_file):
file_prefix = str(uuid.uuid4())
file_name = f"{file_prefix}.hds"
response = http_client.post(
"/files/create",
data={
"file_name": file_prefix,
"type": "hds",
"size": 1,
"drive_name": "DEC RZ22",
},
)
response_data = response.json()
assert response.status_code == 201
assert response_data["status"] == STATUS_SUCCESS
assert response_data["data"]["image"] == file_name
assert (
response_data["messages"][0]["message"]
== f"Image file with properties created: {file_name}"
)
assert file_name in list_files()
# Cleanup
delete_file(file_name)
# route("/files/rename", methods=["POST"])
def test_rename_file(http_client, create_test_image, list_files, delete_file):
original_file = create_test_image(auto_delete=False)
@ -258,6 +287,7 @@ def test_download_url_to_iso(
http_path = f"/images/{test_file_name}"
url = httpserver.url_for(http_path)
ISO_TYPE = "ISO-9660 Level 1"
with open("tests/assets/test_image.hds", mode="rb") as file:
test_file_data = file.read()
@ -271,7 +301,7 @@ def test_download_url_to_iso(
"/files/download_to_iso",
data={
"scsi_id": SCSI_ID,
"type": "-hfs",
"type": ISO_TYPE,
"url": url,
},
)
@ -283,10 +313,11 @@ def test_download_url_to_iso(
assert iso_file_name in list_files()
assert iso_file_name in list_attached_images()
m = response_data["messages"]
assert m[0]["message"] == 'Created CD-ROM ISO image with arguments "-hfs"'
assert m[1]["message"] == f"Saved image as: {env['images_dir']}/{iso_file_name}"
assert m[2]["message"] == f"Attached to SCSI ID {SCSI_ID}"
assert (
response_data["messages"][0]["message"]
== f"CD-ROM image {iso_file_name} with type {ISO_TYPE} was created "
f"and attached to SCSI ID {SCSI_ID}"
)
# Cleanup
detach_devices()

View File

@ -83,7 +83,10 @@ def test_create_image_with_properties_file(http_client, delete_file):
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Image file created: {file_name}"
assert (
response_data["messages"][0]["message"]
== f"Image file with properties created: {file_name}"
)
# Cleanup
delete_file(file_name)