From 5172d167e750f1f97bbd030e4abaae3cf652ba58 Mon Sep 17 00:00:00 2001 From: Daniel Markstedt Date: Sun, 23 Oct 2022 19:05:29 -0700 Subject: [PATCH] Web UI: More file path sanitation, better network bridge warnings, each endpoint return one message (#932) - Sanitize file paths with Path: for flat file structures, always extract Path().name, and for nested file structures either look for absolute paths, or someone trying to use ".." to traverse the dir strucutre. - Reduce redundancy in network bridge detection method, and return somewhat more informative messages - Make all endpoints return exactly one message - Move some warning messages to logging - Use tempfile for iso generation temp file handling --- python/common/src/rascsi/file_cmds.py | 108 ++++----- python/web/src/templates/index.html | 14 +- python/web/src/web.py | 334 +++++++++++++++----------- python/web/src/web_utils.py | 52 ++-- python/web/tests/api/test_files.py | 43 +++- python/web/tests/api/test_misc.py | 5 +- 6 files changed, 323 insertions(+), 233 deletions(-) diff --git a/python/common/src/rascsi/file_cmds.py b/python/common/src/rascsi/file_cmds.py index 8985c413..0cee0035 100644 --- a/python/common/src/rascsi/file_cmds.py +++ b/python/common/src/rascsi/file_cmds.py @@ -2,17 +2,17 @@ Module for methods reading from and writing to the file system """ -import os import logging import asyncio +from os import path, walk from functools import lru_cache from pathlib import PurePath, Path from zipfile import ZipFile, is_zipfile -from time import time from subprocess import run, CalledProcessError from json import dump, load from shutil import copyfile from urllib.parse import quote +from tempfile import TemporaryDirectory import requests @@ -54,14 +54,14 @@ class FileCmds: index 0 is (str) file name and index 1 is (int) size in bytes """ files_list = [] - for path, _dirs, files in os.walk(dir_path): + for file_path, _dirs, files in walk(dir_path): # Only list selected file types files = [f for f in files if f.lower().endswith(file_types)] files_list.extend( [ ( file, - os.path.getsize(os.path.join(path, file)) + path.getsize(path.join(file_path, file)) ) for file in files ] @@ -75,7 +75,7 @@ class FileCmds: Returns a (list) of (str) files_list """ files_list = [] - for _root, _dirs, files in os.walk(CFG_DIR): + for _root, _dirs, files in walk(CFG_DIR): for file in files: if file.endswith("." + CONFIG_FILE_SUFFIX): files_list.append(file) @@ -375,63 +375,61 @@ class FileCmds: server_info = self.ractl.get_server_info() file_name = PurePath(url).name - tmp_ts = int(time()) - tmp_dir = "/tmp/" + str(tmp_ts) + "/" - os.mkdir(tmp_dir) - tmp_full_path = tmp_dir + file_name - iso_filename = f"{server_info['image_dir']}/{file_name}.iso" + iso_filename = Path(server_info['image_dir']) / f"{file_name}.iso" - req_proc = self.download_to_dir(quote(url, safe=URL_SAFE), tmp_dir, file_name) + with TemporaryDirectory() as tmp_dir: + req_proc = self.download_to_dir(quote(url, safe=URL_SAFE), tmp_dir, file_name) + logging.info("Downloaded %s to %s", file_name, tmp_dir) + if not req_proc["status"]: + return {"status": False, "msg": req_proc["msg"]} - if not req_proc["status"]: - return {"status": False, "msg": req_proc["msg"]} - - if is_zipfile(tmp_full_path): - if "XtraStuf.mac" in str(ZipFile(tmp_full_path).namelist()): - logging.info("MacZip file format detected. Will not unzip to retain resource fork.") - else: - logging.info( - "%s is a zipfile! Will attempt to unzip and store the resulting files.", - tmp_full_path, - ) - unzip_proc = asyncio.run(self.run_async("unzip", [ - "-d", - tmp_dir, - "-n", - tmp_full_path, - ])) - if not unzip_proc["returncode"]: + tmp_full_path = Path(tmp_dir) / file_name + if is_zipfile(tmp_full_path): + if "XtraStuf.mac" in str(ZipFile(str(tmp_full_path)).namelist()): + logging.info("MacZip file format detected. Will not unzip to retain resource fork.") + else: logging.info( - "%s was successfully unzipped. Deleting the zipfile.", + "%s is a zipfile! Will attempt to unzip and store the resulting files.", tmp_full_path, ) - self.delete_file(Path(tmp_full_path)) + unzip_proc = asyncio.run(self.run_async("unzip", [ + "-d", + str(tmp_dir), + "-n", + str(tmp_full_path), + ])) + if not unzip_proc["returncode"]: + logging.info( + "%s was successfully unzipped. Deleting the zipfile.", + tmp_full_path, + ) + tmp_full_path.unlink(True) - try: - run( - [ - "genisoimage", - *iso_args, - "-o", - iso_filename, - tmp_dir, - ], - capture_output=True, - check=True, - ) - except CalledProcessError as error: - logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8")) - return {"status": False, "msg": error.stderr.decode("utf-8")} + try: + run( + [ + "genisoimage", + *iso_args, + "-o", + str(iso_filename), + tmp_dir, + ], + capture_output=True, + check=True, + ) + except CalledProcessError as error: + logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8")) + return {"status": False, "msg": error.stderr.decode("utf-8")} - parameters = { - "value": " ".join(iso_args) - } - return { - "status": True, - "return_code": ReturnCodes.DOWNLOADFILETOISO_SUCCESS, - "parameters": parameters, - "file_name": iso_filename, - } + parameters = { + "value": " ".join(iso_args) + } + return { + "status": True, + "return_code": ReturnCodes.DOWNLOADFILETOISO_SUCCESS, + "parameters": parameters, + "file_name": iso_filename.name, + } # noinspection PyMethodMayBeStatic def download_to_dir(self, url, save_dir, file_name): diff --git a/python/web/src/templates/index.html b/python/web/src/templates/index.html index 8715065c..541fba70 100644 --- a/python/web/src/templates/index.html +++ b/python/web/src/templates/index.html @@ -35,7 +35,7 @@

- + .{{ CONFIG_FILE_SUFFIX }}
@@ -522,22 +522,22 @@ diff --git a/python/web/src/web.py b/python/web/src/web.py index 0b1a2485..989a8ca7 100644 --- a/python/web/src/web.py +++ b/python/web/src/web.py @@ -54,6 +54,7 @@ from web_utils import ( get_properties_by_drive_name, auth_active, is_bridge_configured, + is_safe_path, upload_with_dropzonejs, ) from settings import ( @@ -336,7 +337,7 @@ def drive_create(): Creates the image and properties file pair """ drive_name = request.form.get("drive_name") - file_name = request.form.get("file_name") + file_name = Path(request.form.get("file_name")).name properties = get_properties_by_drive_name( APP.config["RASCSI_DRIVE_PROPERTIES"], @@ -360,11 +361,9 @@ def drive_create(): if not process["status"]: return response(error=True, message=process["msg"]) - # TODO: Refactor the return messages into one string - return response(message=[ - (_("Image file created: %(file_name)s", file_name=full_file_name), "success"), - (process["msg"], "success"), - ]) + return response(message= + _("Image file with properties created: %(file_name)s", file_name=full_file_name) + ) @APP.route("/drive/cdrom", methods=["POST"]) @@ -374,7 +373,7 @@ def drive_cdrom(): Creates a properties file for a CD-ROM image """ drive_name = request.form.get("drive_name") - file_name = request.form.get("file_name") + file_name = Path(request.form.get("file_name")).name # Creating the drive properties file file_name = f"{file_name}.{PROPERTIES_SUFFIX}" @@ -396,8 +395,7 @@ def config_save(): """ Saves a config file to disk """ - file_name = request.form.get("name") or "default" - file_name = f"{file_name}.{CONFIG_FILE_SUFFIX}" + file_name = Path(request.form.get("name") + f".{CONFIG_FILE_SUFFIX}").name process = file_cmd.write_config(file_name) process = ReturnCodeMapper.add_msg(process) @@ -413,7 +411,7 @@ def config_load(): """ Loads a config file from disk """ - file_name = request.form.get("name") + file_name = Path(request.form.get("name")).name if "load" in request.form: process = file_cmd.read_config(file_name) @@ -440,18 +438,18 @@ def show_diskinfo(): """ Displays disk image info """ - file_name = request.form.get("file_name") - + file_name = Path(request.form.get("file_name")) + safe_path = is_safe_path(file_name) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) server_info = ractl_cmd.get_server_info() returncode, diskinfo = sys_cmd.get_diskinfo( - server_info["image_dir"] + - "/" + - file_name + Path(server_info["image_dir"]) / file_name ) if returncode == 0: return response( template="diskinfo.html", - file_name=file_name, + file_name=str(file_name), diskinfo=diskinfo, ) @@ -580,17 +578,10 @@ def attach_device(): if param: params.update({item.replace(PARAM_PREFIX, ""): param}) - error_url = "https://github.com/akuker/RASCSI/wiki/Dayna-Port-SCSI-Link" - error_msg = _("Please follow the instructions at %(url)s", url=error_url) - if "interface" in params.keys(): bridge_status = is_bridge_configured(params["interface"]) if not bridge_status["status"]: - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (bridge_status["msg"], "error"), - (error_msg, "error") - ]) + return response(error=True, message=bridge_status["msg"]) kwargs = { "unit": int(unit), @@ -658,32 +649,26 @@ def attach_image(): process = ractl_cmd.attach_device(scsi_id, **kwargs) process = ReturnCodeMapper.add_msg(process) if process["status"]: - response_messages = [(_( - "Attached %(file_name)s as %(device_type)s to " - "SCSI ID %(id_number)s LUN %(unit_number)s", - file_name=file_name, - device_type=get_device_name(device_type), - id_number=scsi_id, - unit_number=unit, - ), "success")] - if int(file_size) % int(expected_block_size): - response_messages.append((_( - "The image file size %(file_size)s bytes is not a multiple of " - "%(block_size)s. RaSCSI will ignore the trailing data. " + logging.warning( + "The image file size %s bytes is not a multiple of %s. " + "RaSCSI will ignore the trailing data. " "The image may be corrupted, so proceed with caution.", - file_size=file_size, - block_size=expected_block_size, - ), "warning")) + file_size, + expected_block_size, + ) + return response( + message=_( + "Attached %(file_name)s as %(device_type)s to " + "SCSI ID %(id_number)s LUN %(unit_number)s", + file_name=file_name, + device_type=get_device_name(device_type), + id_number=scsi_id, + unit_number=unit, + ) + ) - return response(message=response_messages) - - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (_("Failed to attach %(file_name)s to SCSI ID %(id_number)s LUN %(unit_number)s", - file_name=file_name, id_number=scsi_id, unit_number=unit), "error"), - (process["msg"], "error"), - ]) + return response(error=True, message=process["msg"]) @APP.route("/scsi/detach_all", methods=["POST"]) @@ -712,12 +697,7 @@ def detach(): return response(message=_("Detached SCSI ID %(id_number)s LUN %(unit_number)s", id_number=scsi_id, unit_number=unit)) - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (_("Failed to detach SCSI ID %(id_number)s LUN %(unit_number)s", - id_number=scsi_id, unit_number=unit), "error"), - (process["msg"], "error"), - ]) + return response(error=True, message=process["msg"]) @APP.route("/scsi/eject", methods=["POST"]) @@ -734,12 +714,7 @@ def eject(): return response(message=_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s", id_number=scsi_id, unit_number=unit)) - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (_("Failed to eject SCSI ID %(id_number)s LUN %(unit_number)s", - id_number=scsi_id, unit_number=unit), "error"), - (process["msg"], "error"), - ]) + return response(error=True, message=process["msg"]) @APP.route("/scsi/info", methods=["POST"]) @@ -772,11 +747,7 @@ def reserve_id(): RESERVATIONS[int(scsi_id)] = memo return response(message=_("Reserved SCSI ID %(id_number)s", id_number=scsi_id)) - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (_("Failed to reserve SCSI ID %(id_number)s", id_number=scsi_id), "error"), - (process["msg"], "error"), - ]) + return response(error=True, message=process["msg"]) @APP.route("/scsi/release", methods=["POST"]) @@ -793,11 +764,7 @@ def release_id(): RESERVATIONS[int(scsi_id)] = "" return response(message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id)) - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (_("Failed to release the reservation for SCSI ID %(id_number)s", id_number=scsi_id), "error"), - (process["msg"], "error"), - ]) + return response(error=True, message=process["msg"]) @APP.route("/pi/reboot", methods=["POST"]) @@ -824,24 +791,40 @@ def shutdown(): @login_required def download_to_iso(): """ - Downloads a remote file and creates a CD-ROM image formatted with HFS that contains the file + Downloads a file and creates a CD-ROM image with the specified file system and the file """ scsi_id = request.form.get("scsi_id") url = request.form.get("url") - iso_args = request.form.get("type").split() - response_messages = [] + iso_type = request.form.get("type") + + if iso_type == "HFS": + iso_args = ["-hfs"] + elif iso_type == "ISO-9660 Level 1": + iso_args = ["-iso-level", "1"] + elif iso_type == "ISO-9660 Level 2": + iso_args = ["-iso-level", "2"] + elif iso_type == "ISO-9660 Level 3": + iso_args = ["-iso-level", "3"] + elif iso_type == "Joliet": + iso_args = ["-J"] + elif iso_type == "Rock Ridge": + iso_args = ["-r"] + else: + return response( + error=True, + message=_("%(iso_type)s is not a valid CD-ROM format.", iso_type=iso_type) + ) process = file_cmd.download_file_to_iso(url, *iso_args) process = ReturnCodeMapper.add_msg(process) if not process["status"]: - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (_("Failed to create CD-ROM image from %(url)s", url=url), "error"), - (process["msg"], "error"), - ]) - - response_messages.append((process["msg"], "success")) - response_messages.append((_("Saved image as: %(file_name)s", file_name=process['file_name']), "success")) + return response( + error=True, + message=_( + "The following error occurred when creating the CD-ROM image: %(error)s", + error=process["msg"], + ), + ) process_attach = ractl_cmd.attach_device( scsi_id, @@ -850,14 +833,26 @@ def download_to_iso(): ) process_attach = ReturnCodeMapper.add_msg(process_attach) if process_attach["status"]: - response_messages.append((_("Attached to SCSI ID %(id_number)s", id_number=scsi_id), "success")) - return response(message=response_messages) + return response( + message=_( + "CD-ROM image %(file_name)s with type %(iso_type)s was created " + "and attached to SCSI ID %(id_number)s", + file_name=process["file_name"], + iso_type=iso_type, + id_number=scsi_id, + ), + ) - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (_("Failed to attach image to SCSI ID %(id_number)s. Try attaching it manually.", id_number=scsi_id), "error"), - (process_attach["msg"], "error"), - ]) + return response( + error=True, + message=_( + "CD-ROM image %(file_name)s with type %(iso_type)s was created " + "but could not be attached: %(error)s", + file_name=process["file_name"], + iso_type=iso_type, + error=process_attach["msg"], + ), + ) @APP.route("/files/download_url", methods=["POST"]) @@ -878,11 +873,13 @@ def download_file(): if process["status"]: return response(message=process["msg"]) - # TODO: Refactor the return messages into one string - return response(error=True, message=[ - (_("Failed to download file from %(url)s", url=url), "error"), - (process["msg"], "error"), - ]) + return response( + error=True, + message=_( + "The following error occurred when downloading: %(error)s", + error=process["msg"], + ), + ) @APP.route("/files/upload", methods=["POST"]) @@ -911,13 +908,16 @@ def create_file(): """ Creates an empty image file in the images dir """ - file_name = request.form.get("file_name") + file_name = Path(request.form.get("file_name")) size = (int(request.form.get("size")) * 1024 * 1024) file_type = request.form.get("type") drive_name = request.form.get("drive_name") - full_file_name = file_name + "." + file_type - process = file_cmd.create_new_image(file_name, file_type, size) + safe_path = is_safe_path(file_name) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) + full_file_name = f"{file_name}.{file_type}" + process = file_cmd.create_new_image(str(file_name), file_type, size) if not process["status"]: return response(error=True, message=process["msg"]) @@ -933,15 +933,20 @@ def create_file(): if not process["status"]: return response(error=True, message=process["msg"]) + return response( + status_code=201, + message=_( + "Image file with properties created: %(file_name)s", + file_name=full_file_name, + ), + image=full_file_name, + ) + return response( status_code=201, - # TODO: Refactor the return messages into one string - message=[ - (_("Image file created: %(file_name)s", file_name=full_file_name), "success"), - (process["msg"], "success"), - ], + message=_("Image file created: %(file_name)s", file_name=full_file_name), image=full_file_name, - ) + ) @APP.route("/files/download", methods=["POST"]) @@ -950,9 +955,12 @@ def download(): """ Downloads a file from the Pi to the local computer """ - file_name = request.form.get("file") + file_name = Path(request.form.get("file")) + safe_path = is_safe_path(file_name) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) server_info = ractl_cmd.get_server_info() - return send_from_directory(server_info["image_dir"], file_name, as_attachment=True) + return send_from_directory(server_info["image_dir"], str(file_name), as_attachment=True) @APP.route("/files/delete", methods=["POST"]) @@ -961,26 +969,35 @@ def delete(): """ Deletes a specified file in the images dir """ - file_name = request.form.get("file_name") - - process = file_cmd.delete_image(file_name) + file_name = Path(request.form.get("file_name")) + safe_path = is_safe_path(file_name) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) + process = file_cmd.delete_image(str(file_name)) if not process["status"]: return response(error=True, message=process["msg"]) - response_messages = [ - (_("Image file deleted: %(file_name)s", file_name=file_name), "success")] - # Delete the drive properties file, if it exists prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}" if prop_file_path.is_file(): process = file_cmd.delete_file(prop_file_path) process = ReturnCodeMapper.add_msg(process) if process["status"]: - response_messages.append((process["msg"], "success")) + return response( + message=_( + "Image file with properties deleted: %(file_name)s", + file_name=str(file_name), + ), + ) else: - response_messages.append((process["msg"], "error")) + return response(error=True, message=process["msg"]) - return response(message=response_messages) + return response( + message=_( + "Image file deleted: %(file_name)s", + file_name=str(file_name), + ), + ) @APP.route("/files/rename", methods=["POST"]) @@ -989,16 +1006,18 @@ def rename(): """ Renames a specified file in the images dir """ - file_name = request.form.get("file_name") - new_file_name = request.form.get("new_file_name") - - process = file_cmd.rename_image(file_name, new_file_name) + file_name = Path(request.form.get("file_name")) + new_file_name = Path(request.form.get("new_file_name")) + safe_path = is_safe_path(file_name) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) + safe_path = is_safe_path(new_file_name) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) + process = file_cmd.rename_image(str(file_name), str(new_file_name)) if not process["status"]: return response(error=True, message=process["msg"]) - response_messages = [ - (_("Image file renamed to: %(file_name)s", file_name=new_file_name), "success")] - # Rename the drive properties file, if it exists prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}" new_prop_file_path = Path(CFG_DIR) / f"{new_file_name}.{PROPERTIES_SUFFIX}" @@ -1006,11 +1025,21 @@ def rename(): process = file_cmd.rename_file(prop_file_path, new_prop_file_path) process = ReturnCodeMapper.add_msg(process) if process["status"]: - response_messages.append((process["msg"], "success")) + return response( + message=_( + "Image file with properties renamed to: %(file_name)s", + file_name=str(new_file_name), + ), + ) else: - response_messages.append((process["msg"], "error")) + return response(error=True, message=process["msg"]) - return response(message=response_messages) + return response( + message=_( + "Image file renamed to: %(file_name)s", + file_name=str(new_file_name), + ), + ) @APP.route("/files/copy", methods=["POST"]) @@ -1019,16 +1048,18 @@ def copy(): """ Creates a copy of a specified file in the images dir """ - file_name = request.form.get("file_name") - new_file_name = request.form.get("copy_file_name") - - process = file_cmd.copy_image(file_name, new_file_name) + file_name = Path(request.form.get("file_name")) + new_file_name = Path(request.form.get("copy_file_name")) + safe_path = is_safe_path(file_name) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) + safe_path = is_safe_path(new_file_name) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) + process = file_cmd.copy_image(str(file_name), str(new_file_name)) if not process["status"]: return response(error=True, message=process["msg"]) - response_messages = [ - (_("Copy of image file saved as: %(file_name)s", file_name=new_file_name), "success")] - # Create a copy of the drive properties file, if it exists prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}" new_prop_file_path = Path(CFG_DIR) / f"{new_file_name}.{PROPERTIES_SUFFIX}" @@ -1036,11 +1067,21 @@ def copy(): process = file_cmd.copy_file(prop_file_path, new_prop_file_path) process = ReturnCodeMapper.add_msg(process) if process["status"]: - response_messages.append((process["msg"], "success")) + return response( + message=_( + "Copy of image file with properties saved as: %(file_name)s", + file_name=str(new_file_name), + ), + ) else: - response_messages.append((process["msg"], "error")) + return response(error=True, message=process["msg"]) - return response(message=response_messages) + return response( + message=_( + "Copy of image file saved as: %(file_name)s", + file_name=str(new_file_name), + ), + ) @APP.route("/files/extract_image", methods=["POST"]) @@ -1049,31 +1090,34 @@ def extract_image(): """ Extracts all or a subset of files in the specified archive """ - archive_file = request.form.get("archive_file") + archive_file = Path(request.form.get("archive_file")) archive_members_raw = request.form.get("archive_members") or None archive_members = archive_members_raw.split("|") if archive_members_raw else None - + safe_path = is_safe_path(archive_file) + if not safe_path["status"]: + return response(error=True, message=safe_path["msg"]) extract_result = file_cmd.extract_image( - archive_file, + str(archive_file), archive_members ) if extract_result["return_code"] == ReturnCodes.EXTRACTIMAGE_SUCCESS: - response_messages = [(ReturnCodeMapper.add_msg(extract_result).get("msg"), "success")] for properties_file in extract_result["properties_files_moved"]: if properties_file["status"]: - response_messages.append((_("Properties file %(file)s moved to %(directory)s", - file=properties_file['name'], - directory=CFG_DIR - ), "success")) + logging.info( + "Properties file %s moved to %s", + properties_file["name"], + CFG_DIR, + ) else: - response_messages.append((_("Failed to move properties file %(file)s to %(directory)s", - file=properties_file['name'], - directory=CFG_DIR - ), "error")) + logging.warning( + "Failed to move properties file %s to %s", + properties_file["name"], + CFG_DIR, + ) - return response(message=response_messages) + return response(message=ReturnCodeMapper.add_msg(extract_result).get("msg")) return response(error=True, message=ReturnCodeMapper.add_msg(extract_result).get("msg")) diff --git a/python/web/src/web_utils.py b/python/web/src/web_utils.py index 8efd3589..4c6d5ba5 100644 --- a/python/web/src/web_utils.py +++ b/python/web/src/web_utils.py @@ -240,29 +240,43 @@ def is_bridge_configured(interface): Takes (str) interface of a network device being attached. Returns a (dict) with (bool) status and (str) msg """ - # TODO: Reduce the nesting of these checks, and streamline how the results are notified - status = True - return_msg = "" + PATH_SYSCTL = "/etc/sysctl.conf" + PATH_IPTV4 = "/etc/iptables/rules.v4" + PATH_DHCPCD = "/etc/dhcpcd.conf" + PATH_BRIDGE = "/etc/network/interfaces.d/rascsi_bridge" + return_msg = _("Configure the network bridge for %(interface)s first: ", interface=interface) + to_configure = [] sys_cmd = SysCmds() if interface.startswith("wlan"): - if not sys_cmd.introspect_file("/etc/sysctl.conf", r"^net\.ipv4\.ip_forward=1$"): - status = False - return_msg = _("Configure IPv4 forwarding before using a wireless network device.") - elif not Path("/etc/iptables/rules.v4").is_file(): - status = False - return_msg = _("Configure NAT before using a wireless network device.") + if not sys_cmd.introspect_file(PATH_SYSCTL, r"^net\.ipv4\.ip_forward=1$"): + to_configure.append("IPv4 forwarding") + if not Path(PATH_IPTV4).is_file(): + to_configure.append("NAT") else: - if not sys_cmd.introspect_file( - "/etc/dhcpcd.conf", - r"^denyinterfaces " + interface + r"$", - ): - status = False - return_msg = _("Configure the network bridge before using a wired network device.") - elif not Path("/etc/network/interfaces.d/rascsi_bridge").is_file(): - status = False - return_msg = _("Configure the network bridge before using a wired network device.") + if not sys_cmd.introspect_file(PATH_DHCPCD, r"^denyinterfaces " + interface + r"$"): + to_configure.append(PATH_DHCPCD) + if not Path(PATH_BRIDGE).is_file(): + to_configure.append(PATH_BRIDGE) - return {"status": status, "msg": return_msg + f" ({interface})"} + if to_configure: + return {"status": False, "msg": return_msg + ", ".join(to_configure)} + + return {"status": True, "msg": ""} + + +def is_safe_path(file_name): + """ + Takes (Path) file_name with the path to a file on the file system + Returns True if the path is safe + Returns False if the path is either absolute, or tries to traverse the file system + """ + if file_name.is_absolute() or ".." in str(file_name): + return { + "status": False, + "msg": _("%(file_name)s is not a valid path", file_name=file_name), + } + + return {"status": True, "msg": ""} def upload_with_dropzonejs(image_dir): diff --git a/python/web/tests/api/test_files.py b/python/web/tests/api/test_files.py index f538076c..b36d7f5f 100644 --- a/python/web/tests/api/test_files.py +++ b/python/web/tests/api/test_files.py @@ -20,7 +20,6 @@ def test_create_file(http_client, list_files, delete_file): "file_name": file_prefix, "type": "hds", "size": 1, - "drive_name": "DEC RZ22", }, ) @@ -36,6 +35,36 @@ def test_create_file(http_client, list_files, delete_file): delete_file(file_name) +# route("/files/create", methods=["POST"]) +def test_create_file_with_properties(http_client, list_files, delete_file): + file_prefix = str(uuid.uuid4()) + file_name = f"{file_prefix}.hds" + + response = http_client.post( + "/files/create", + data={ + "file_name": file_prefix, + "type": "hds", + "size": 1, + "drive_name": "DEC RZ22", + }, + ) + + response_data = response.json() + + assert response.status_code == 201 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["data"]["image"] == file_name + assert ( + response_data["messages"][0]["message"] + == f"Image file with properties created: {file_name}" + ) + assert file_name in list_files() + + # Cleanup + delete_file(file_name) + + # route("/files/rename", methods=["POST"]) def test_rename_file(http_client, create_test_image, list_files, delete_file): original_file = create_test_image(auto_delete=False) @@ -258,6 +287,7 @@ def test_download_url_to_iso( http_path = f"/images/{test_file_name}" url = httpserver.url_for(http_path) + ISO_TYPE = "ISO-9660 Level 1" with open("tests/assets/test_image.hds", mode="rb") as file: test_file_data = file.read() @@ -271,7 +301,7 @@ def test_download_url_to_iso( "/files/download_to_iso", data={ "scsi_id": SCSI_ID, - "type": "-hfs", + "type": ISO_TYPE, "url": url, }, ) @@ -283,10 +313,11 @@ def test_download_url_to_iso( assert iso_file_name in list_files() assert iso_file_name in list_attached_images() - m = response_data["messages"] - assert m[0]["message"] == 'Created CD-ROM ISO image with arguments "-hfs"' - assert m[1]["message"] == f"Saved image as: {env['images_dir']}/{iso_file_name}" - assert m[2]["message"] == f"Attached to SCSI ID {SCSI_ID}" + assert ( + response_data["messages"][0]["message"] + == f"CD-ROM image {iso_file_name} with type {ISO_TYPE} was created " + f"and attached to SCSI ID {SCSI_ID}" + ) # Cleanup detach_devices() diff --git a/python/web/tests/api/test_misc.py b/python/web/tests/api/test_misc.py index 5b6272f5..ba8f1836 100644 --- a/python/web/tests/api/test_misc.py +++ b/python/web/tests/api/test_misc.py @@ -83,7 +83,10 @@ def test_create_image_with_properties_file(http_client, delete_file): assert response.status_code == 200 assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"Image file created: {file_name}" + assert ( + response_data["messages"][0]["message"] + == f"Image file with properties created: {file_name}" + ) # Cleanup delete_file(file_name)