From edf65a8768685f1e66cd11299985c66489e2ff69 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Sat, 10 Sep 2022 02:58:35 +0100 Subject: [PATCH 01/16] Update RaCtlCmds methods to return JSON serializable results --- python/common/src/rascsi/ractl_cmds.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/common/src/rascsi/ractl_cmds.py b/python/common/src/rascsi/ractl_cmds.py index 4cc9c37a..25a001e0 100644 --- a/python/common/src/rascsi/ractl_cmds.py +++ b/python/common/src/rascsi/ractl_cmds.py @@ -40,7 +40,7 @@ class RaCtlCmds: version = (str(result.server_info.version_info.major_version) + "." + str(result.server_info.version_info.minor_version) + "." + str(result.server_info.version_info.patch_version)) - log_levels = result.server_info.log_level_info.log_levels + log_levels = list(result.server_info.log_level_info.log_levels) current_log_level = result.server_info.log_level_info.current_log_level reserved_ids = list(result.server_info.reserved_ids_info.ids) image_dir = result.server_info.image_files_info.default_image_folder @@ -113,7 +113,7 @@ class RaCtlCmds: result = proto.PbResult() result.ParseFromString(data) ifs = result.network_interfaces_info.name - return {"status": result.status, "ifs": ifs} + return {"status": result.status, "ifs": list(ifs)} def get_device_types(self): """ @@ -140,7 +140,7 @@ class RaCtlCmds: "removable": device.properties.removable, "supports_file": device.properties.supports_file, "params": params, - "block_sizes": device.properties.block_sizes, + "block_sizes": list(device.properties.block_sizes), } return {"status": result.status, "device_types": device_types} @@ -394,7 +394,7 @@ class RaCtlCmds: dpath = result.devices_info.devices[i].file.name dfile = dpath.replace(image_files_info["images_dir"] + "/", "") - dparam = result.devices_info.devices[i].params + dparam = dict(result.devices_info.devices[i].params) dven = result.devices_info.devices[i].vendor dprod = result.devices_info.devices[i].product drev = result.devices_info.devices[i].revision From 4ef2e20b4fe1ea42cfd1bac1f192fd60e25a095d Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Sat, 10 Sep 2022 02:58:47 +0100 Subject: [PATCH 02/16] Update get_supported_locales to return a JSON serializable result --- python/web/src/web.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/web/src/web.py b/python/web/src/web.py index 9c567763..288113fb 100644 --- a/python/web/src/web.py +++ b/python/web/src/web.py @@ -89,10 +89,12 @@ def get_supported_locales(): """ Returns a list of Locale objects that the Web Interfaces supports """ - locales = BABEL.list_translations() - locales.append(Locale("en")) - sorted_locales = sorted(locales, key=lambda x: x.language) - return sorted_locales + locales = [ + {"language": x.language, "display_name": x.display_name} + for x in [*BABEL.list_translations(), Locale("en")] + ] + + return sorted(locales, key=lambda x: x["language"]) # pylint: disable=too-many-locals From 1e9a7d22839601264d8b96c6365d820638979de7 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Thu, 22 Sep 2022 01:20:22 +0100 Subject: [PATCH 03/16] Move flattening of file type lists to template layer --- python/web/src/templates/index.html | 12 ++++++++++-- python/web/src/web.py | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/python/web/src/templates/index.html b/python/web/src/templates/index.html index 4eb43528..580d46d5 100644 --- a/python/web/src/templates/index.html +++ b/python/web/src/templates/index.html @@ -156,8 +156,16 @@
  • {{ _("Select a valid SCSI ID and LUN to attach to. Unless you know what you're doing, always use LUN 0.", url="https://en.wikipedia.org/wiki/Logical_unit_number") }}
  • {{ _("If RaSCSI was unable to detect the media type associated with the image, you get to choose the type from the dropdown.") }}
  • -
  • {{ _("Recognized image file types: %(valid_image_suffixes)s", valid_image_suffixes=valid_image_suffixes) }}
  • -
  • {{ _("Recognized archive file types: %(ARCHIVE_FILE_SUFFIXES)s", ARCHIVE_FILE_SUFFIXES=ARCHIVE_FILE_SUFFIXES) }}
  • +
  • + {{ _("Recognized image file types:") }} + {% set comma = joiner(", ") %} + {% for extension in valid_image_suffixes %}{{ comma() }}.{{ extension}}{% endfor %} +
  • +
  • + {{ _("Recognized archive file types:") }} + {% set comma = joiner(", ") %} + {% for extension in ARCHIVE_FILE_SUFFIXES %}{{ comma() }}.{{ extension}}{% endfor %} +
  • diff --git a/python/web/src/web.py b/python/web/src/web.py index 288113fb..41db5e2a 100644 --- a/python/web/src/web.py +++ b/python/web/src/web.py @@ -149,7 +149,7 @@ def index(): server_info["scmo"] ) - valid_image_suffixes = "." + ", .".join( + valid_image_suffixes = ( server_info["schd"] + server_info["scrm"] + server_info["scmo"] + @@ -198,7 +198,7 @@ def index(): username=username, auth_active=auth_active(AUTH_GROUP)["status"], PROPERTIES_SUFFIX=PROPERTIES_SUFFIX, - ARCHIVE_FILE_SUFFIXES="." + ", .".join(ARCHIVE_FILE_SUFFIXES), + ARCHIVE_FILE_SUFFIXES=ARCHIVE_FILE_SUFFIXES, REMOVABLE_DEVICE_TYPES=ractl_cmd.get_removable_device_types(), DISK_DEVICE_TYPES=ractl_cmd.get_disk_device_types(), PERIPHERAL_DEVICE_TYPES=ractl_cmd.get_peripheral_device_types(), From fb8f306005d9ace7038420b07590a09d107d339d Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Mon, 26 Sep 2022 00:00:18 +0100 Subject: [PATCH 04/16] Implement response generator for HTML and JSON requests Supporting updates to web.py and templates --- python/web/src/static/style.css | 31 +- python/web/src/templates/base.html | 18 +- python/web/src/templates/drives.html | 2 +- python/web/src/templates/index.html | 2 +- python/web/src/web.py | 489 +++++++++++++++------------ 5 files changed, 298 insertions(+), 244 deletions(-) diff --git a/python/web/src/static/style.css b/python/web/src/static/style.css index 48fc91c6..202b56d2 100644 --- a/python/web/src/static/style.css +++ b/python/web/src/static/style.css @@ -31,18 +31,33 @@ table, tr, td { margin: none; } -.error { - color: white; - font-size:20px; - background-color:red; - white-space: pre-line; +div.flash { + margin-top: 5px; + margin-bottom: 5px; } -.message { +div.flash div { color: white; - font-size:20px; - background-color:green; + font-size: 18px; white-space: pre-line; + padding: 2px 5px; +} + +div.flash div.success { + background-color: green; +} + +div.flash div.warning { + background-color: orange; + color: black; +} + +div.flash div.error { + background-color: red; +} + +div.flash div.info { + background-color: #0d6efd; } td.inactive { diff --git a/python/web/src/templates/base.html b/python/web/src/templates/base.html index ce8553e1..2eb304de 100644 --- a/python/web/src/templates/base.html +++ b/python/web/src/templates/base.html @@ -1,7 +1,7 @@ - {{ _("RaSCSI Reloaded Control Page") }} [{{ host }}] + {{ _("RaSCSI Reloaded Control Page") }} [{{ env["host"] }}] @@ -26,12 +26,12 @@ @@ -43,9 +43,9 @@
    - {% if auth_active %} - {% if username %} - {{ _("Logged in as %(username)s", username=username) }} – {{ _("Log Out") }} + {% if env["auth_active"] %} + {% if env["username"] %} + {{ _("Logged in as %(username)s", username=env["username"]) }} – {{ _("Log Out") }} {% else %}
    @@ -70,7 +70,7 @@ - hostname: {{ host }} ip: {{ ip_addr }} + hostname: {{ env["host"] }} ip: {{ env["ip_addr"] }} @@ -89,8 +89,8 @@ {% block content %}{% endblock content %}
    diff --git a/python/web/src/templates/drives.html b/python/web/src/templates/drives.html index 76393faf..a9cdad95 100644 --- a/python/web/src/templates/drives.html +++ b/python/web/src/templates/drives.html @@ -135,7 +135,7 @@ {% endfor %} -

    {{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=free_disk) }}

    +

    {{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=env["free_disk_space"]) }}

    {{ _("Cancel") }}

    {% endblock content %} diff --git a/python/web/src/templates/index.html b/python/web/src/templates/index.html index 580d46d5..d0ed8ef7 100644 --- a/python/web/src/templates/index.html +++ b/python/web/src/templates/index.html @@ -309,7 +309,7 @@ {% endfor %} -

    {{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=free_disk) }}

    +

    {{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=env["free_disk_space"]) }}


    diff --git a/python/web/src/web.py b/python/web/src/web.py index 41db5e2a..918d7310 100644 --- a/python/web/src/web.py +++ b/python/web/src/web.py @@ -27,6 +27,7 @@ from flask import ( make_response, session, abort, + jsonify, ) from rascsi.ractl_cmds import RaCtlCmds @@ -67,6 +68,66 @@ from settings import ( APP = Flask(__name__) BABEL = Babel(APP) +def get_env_info(): + """ + Get information about the app/host environment + """ + ip_addr, host = sys_cmd.get_ip_and_host() + + if "username" in session: + username = session["username"] + else: + username = None + + return { + "running_env": sys_cmd.running_env(), + "username": username, + "auth_active": auth_active(AUTH_GROUP)["status"], + "ip_addr": ip_addr, + "host": host, + "free_disk_space": int(sys_cmd.disk_space()["free"] / 1024 / 1024), + } + + +def response( + template=None, + message=None, + redirect_url=None, + error=False, + status_code=200, + **kwargs +): + """ + Generates a HTML or JSON HTTP response + """ + status = "error" if error else "success" + + if isinstance(message, list): + messages = message + elif message is None: + messages = [] + else: + messages = [(str(message), status)] + + if request.headers.get("accept") == "application/json": + return jsonify({ + "status": status, + "messages": [{"message": m, "category": c} for m, c in messages], + "data": kwargs + }), status_code + + if messages: + for message, category in messages: + flash(message, category) + + if template: + kwargs["env"] = get_env_info() + return render_template(template, **kwargs) + + if redirect_url: + return redirect(url_for(redirect_url)) + return redirect(url_for("index")) + @BABEL.localeselector def get_locale(): @@ -161,14 +222,12 @@ def index(): else: username = None - return render_template( - "index.html", + return response( + template="index.html", locales=get_supported_locales(), bridge_configured=sys_cmd.is_bridge_setup(), netatalk_configured=sys_cmd.running_proc("afpd"), macproxy_configured=sys_cmd.running_proc("macproxy"), - ip_addr=ip_addr, - host=host, devices=formatted_devices, files=extended_image_files, config_files=config_files, @@ -183,20 +242,16 @@ def index(): reserved_scsi_ids=reserved_scsi_ids, RESERVATIONS=RESERVATIONS, max_file_size=int(int(MAX_FILE_SIZE) / 1024 / 1024), - running_env=sys_cmd.running_env(), version=server_info["version"], log_levels=server_info["log_levels"], current_log_level=server_info["current_log_level"], netinfo=ractl_cmd.get_network_info(), device_types=device_types, - free_disk=int(sys_cmd.disk_space()["free"] / 1024 / 1024), image_suffixes_to_create=image_suffixes_to_create, valid_image_suffixes=valid_image_suffixes, cdrom_file_suffix=tuple(server_info["sccd"]), removable_file_suffix=tuple(server_info["scrm"]), mo_file_suffix=tuple(server_info["scmo"]), - username=username, - auth_active=auth_active(AUTH_GROUP)["status"], PROPERTIES_SUFFIX=PROPERTIES_SUFFIX, ARCHIVE_FILE_SUFFIXES=ARCHIVE_FILE_SUFFIXES, REMOVABLE_DEVICE_TYPES=ractl_cmd.get_removable_device_types(), @@ -213,23 +268,20 @@ def drive_list(): # Reads the canonical drive properties into a dict # The file resides in the current dir of the web ui process drive_properties = Path(DRIVE_PROPERTIES_FILE) - if drive_properties.is_file(): - process = file_cmd.read_drive_properties(str(drive_properties)) - process = ReturnCodeMapper.add_msg(process) - if not process["status"]: - flash(process["msg"], "error") - return redirect(url_for("index")) - conf = process["conf"] - else: - flash( - _( - "Could not read drive properties from %(properties_file)s", - properties_file=drive_properties, - ), - "error", - ) - return redirect(url_for("index")) + if not drive_properties.is_file(): + return response( + error=True, + message=_("Could not read drive properties from %(properties_file)s", + properties_file=drive_properties), + ) + process = file_cmd.read_drive_properties(str(drive_properties)) + process = ReturnCodeMapper.add_msg(process) + + if not process["status"]: + return response(error=True, message=process["msg"]) + + conf = process["conf"] hd_conf = [] cd_conf = [] rm_conf = [] @@ -247,30 +299,18 @@ def drive_list(): device["size_mb"] = "{:,.2f}".format(device["size"] / 1024 / 1024) rm_conf.append(device) - if "username" in session: - username = session["username"] - else: - username = None - server_info = ractl_cmd.get_server_info() - ip_addr, host = sys_cmd.get_ip_and_host() - return render_template( + return response( "drives.html", files=file_cmd.list_images()["files"], base_dir=server_info["image_dir"], hd_conf=hd_conf, cd_conf=cd_conf, rm_conf=rm_conf, - running_env=sys_cmd.running_env(), version=server_info["version"], - free_disk=int(sys_cmd.disk_space()["free"] / 1024 / 1024), cdrom_file_suffix=tuple(server_info["sccd"]), - username=username, - auth_active=auth_active(AUTH_GROUP)["status"], - ip_addr=ip_addr, - host=host, - ) + ) @APP.route("/login", methods=["POST"]) @@ -280,20 +320,17 @@ def login(): """ username = request.form["username"] password = request.form["password"] - groups = [g.gr_name for g in getgrall() if username in g.gr_mem] + if AUTH_GROUP in groups: if authenticate(str(username), str(password)): session["username"] = request.form["username"] - return redirect(url_for("index")) - flash( - _( - "You must log in with credentials for a user in the '%(group)s' group", - group=AUTH_GROUP, - ), - "error", - ) - return redirect(url_for("index")) + return response() + + return response(error=True, status_code=401, message=_( + "You must log in with valid credentials for a user in the '%(group)s' group", + group=AUTH_GROUP, + )) @APP.route("/logout") @@ -302,7 +339,7 @@ def logout(): Removes the logged in user from the session """ session.pop("username", None) - return redirect(url_for("index")) + return response() @APP.route("/pwa/") @@ -321,8 +358,7 @@ def login_required(func): def decorated_function(*args, **kwargs): auth = auth_active(AUTH_GROUP) if auth["status"] and "username" not in session: - flash(auth["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=auth["msg"]) return func(*args, **kwargs) return decorated_function @@ -344,11 +380,8 @@ def drive_create(): # Creating the image file process = file_cmd.create_new_image(file_name, file_type, size) - if process["status"]: - flash(_("Image file created: %(file_name)s", file_name=full_file_name)) - else: - flash(process["msg"], "error") - return redirect(url_for("index")) + if not process["status"]: + return response(error=True, message=process["msg"]) # Creating the drive properties file prop_file_name = f"{file_name}.{file_type}.{PROPERTIES_SUFFIX}" @@ -360,12 +393,10 @@ def drive_create(): } process = file_cmd.write_drive_properties(prop_file_name, properties) process = ReturnCodeMapper.add_msg(process) - if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + if not process["status"]: + return response(error=True, message=process["msg"]) - flash(process['msg'], "error") - return redirect(url_for("index")) + return response(message=_("Image file created: %(file_name)s", file_name=full_file_name)) @APP.route("/drive/cdrom", methods=["POST"]) @@ -391,11 +422,9 @@ def drive_cdrom(): process = file_cmd.write_drive_properties(file_name, properties) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + return response(message=process["msg"]) - flash(process['msg'], "error") - return redirect(url_for("index")) + return response(error=True, message=process["msg"]) @APP.route("/config/save", methods=["POST"]) @@ -410,11 +439,9 @@ def config_save(): process = file_cmd.write_config(file_name) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + return response(message=process["msg"]) - flash(process['msg'], "error") - return redirect(url_for("index")) + return response(error=True, message=process["msg"]) @APP.route("/config/load", methods=["POST"]) @@ -429,24 +456,19 @@ def config_load(): process = file_cmd.read_config(file_name) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + return response(message=process["msg"]) + + return response(error=True, message=process["msg"]) - flash(process['msg'], "error") - return redirect(url_for("index")) if "delete" in request.form: process = file_cmd.delete_file(f"{CFG_DIR}/{file_name}") process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + return response(message=process["msg"]) - flash(process['msg'], "error") - return redirect(url_for("index")) + return response(error=True, message=process["msg"]) - # The only reason we would reach here would be a Web UI bug. Will not localize. - flash("Got an unhandled request (needs to be either load or delete)", "error") - return redirect(url_for("index")) + return response(error=True, message="Action field (load, delete) missing") @APP.route("/logs/show", methods=["POST"]) @@ -457,14 +479,16 @@ def show_logs(): lines = request.form.get("lines") scope = request.form.get("scope") + # TODO: Render logs in template returncode, logs = sys_cmd.get_logs(lines, scope) if returncode == 0: headers = {"content-type": "text/plain"} - return logs, int(lines), headers + return logs, headers - flash(_("An error occurred when fetching logs.")) - flash(logs, "stderr") - return redirect(url_for("index")) + return response(error=True, message=[ + (_("An error occurred when fetching logs."), "error"), + (logs, "stderr"), + ]) @APP.route("/logs/level", methods=["POST"]) @@ -477,11 +501,9 @@ def log_level(): process = ractl_cmd.set_log_level(level) if process["status"]: - flash(_("Log level set to %(value)s", value=level)) - return redirect(url_for("index")) + return response(message=_("Log level set to %(value)s", value=level)) - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=process["msg"]) @APP.route("/scsi/attach_device", methods=["POST"]) @@ -507,11 +529,14 @@ def attach_device(): error_msg = _("Please follow the instructions at %(url)s", url=error_url) if "interface" in params.keys(): + # TODO: Can the behaviour of this function be made more intuitive? bridge_status = is_bridge_configured(params["interface"]) + # Error condition is truthy if bridge_status: - flash(bridge_status, "error") - flash(error_msg, "error") - return redirect(url_for("index")) + return response(error=True, message=[ + (bridge_status, "error"), + (error_msg, "error") + ]) kwargs = { "unit": int(unit), @@ -521,16 +546,14 @@ def attach_device(): process = ractl_cmd.attach_device(scsi_id, **kwargs) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(_( + return response(message=_( "Attached %(device_type)s to SCSI ID %(id_number)s LUN %(unit_number)s", device_type=get_device_name(device_type), id_number=scsi_id, unit_number=unit, )) - return redirect(url_for("index")) - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=process["msg"]) @APP.route("/scsi/attach", methods=["POST"]) @@ -559,8 +582,7 @@ def attach_image(): process = file_cmd.read_drive_properties(drive_properties) process = ReturnCodeMapper.add_msg(process) if not process["status"]: - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=process["msg"]) conf = process["conf"] kwargs["vendor"] = conf["vendor"] kwargs["product"] = conf["product"] @@ -571,28 +593,31 @@ def attach_image(): process = ractl_cmd.attach_device(scsi_id, **kwargs) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(_( + response_messages = [(_( "Attached %(file_name)s as %(device_type)s to " "SCSI ID %(id_number)s LUN %(unit_number)s", file_name=file_name, device_type=get_device_name(device_type), id_number=scsi_id, unit_number=unit, - )) + ), "success")] + if int(file_size) % int(expected_block_size): - flash(_( + response_messages.append((_( "The image file size %(file_size)s bytes is not a multiple of " "%(block_size)s. RaSCSI will ignore the trailing data. " "The image may be corrupted, so proceed with caution.", file_size=file_size, block_size=expected_block_size, - ), "error") - return redirect(url_for("index")) + ), "warning")) - flash(_("Failed to attach %(file_name)s to SCSI ID %(id_number)s LUN %(unit_number)s", - file_name=file_name, id_number=scsi_id, unit_number=unit), "error") - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(message=response_messages) + + return response(error=True, message=[ + (_("Failed to attach %(file_name)s to SCSI ID %(id_number)s LUN %(unit_number)s", + file_name=file_name, id_number=scsi_id, unit_number=unit), "error"), + (process["msg"], "error"), + ]) @APP.route("/scsi/detach_all", methods=["POST"]) @@ -603,11 +628,9 @@ def detach_all_devices(): """ process = ractl_cmd.detach_all() if process["status"]: - flash(_("Detached all SCSI devices")) - return redirect(url_for("index")) + return response(message=_("Detached all SCSI devices")) - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=process["msg"]) @APP.route("/scsi/detach", methods=["POST"]) @@ -620,14 +643,14 @@ def detach(): unit = request.form.get("unit") process = ractl_cmd.detach_by_id(scsi_id, unit) if process["status"]: - flash(_("Detached SCSI ID %(id_number)s LUN %(unit_number)s", - id_number=scsi_id, unit_number=unit)) - return redirect(url_for("index")) + return response(message=_("Detached SCSI ID %(id_number)s LUN %(unit_number)s", + id_number=scsi_id, unit_number=unit)) - flash(_("Failed to detach SCSI ID %(id_number)s LUN %(unit_number)s", - id_number=scsi_id, unit_number=unit), "error") - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=[ + (_("Failed to detach SCSI ID %(id_number)s LUN %(unit_number)s", + id_number=scsi_id, unit_number=unit), "error"), + (process["msg"], "error"), + ]) @APP.route("/scsi/eject", methods=["POST"]) @@ -641,14 +664,15 @@ def eject(): process = ractl_cmd.eject_by_id(scsi_id, unit) if process["status"]: - flash(_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s", - id_number=scsi_id, unit_number=unit)) - return redirect(url_for("index")) + return response(message=_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s", + id_number=scsi_id, unit_number=unit)) + + return response(error=True, message=[ + (_("Failed to eject SCSI ID %(id_number)s LUN %(unit_number)s", + id_number=scsi_id, unit_number=unit), "error"), + (process["msg"], "error"), + ]) - flash(_("Failed to eject SCSI ID %(id_number)s LUN %(unit_number)s", - id_number=scsi_id, unit_number=unit), "error") - flash(process["msg"], "error") - return redirect(url_for("index")) @APP.route("/scsi/info", methods=["POST"]) def device_info(): @@ -662,29 +686,47 @@ def device_info(): # First check if any device at all was returned if not devices["status"]: - flash(devices["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=devices["msg"]) # Looking at the first dict in list to get # the one and only device that should have been returned device = devices["device_list"][0] if str(device["id"]) == scsi_id: - flash(_("DEVICE INFO")) - flash("===========") - flash(_("SCSI ID: %(id_number)s", id_number=device["id"])) - flash(_("LUN: %(unit_number)s", unit_number=device["unit"])) - flash(_("Type: %(device_type)s", device_type=device["device_type"])) - flash(_("Status: %(device_status)s", device_status=device["status"])) - flash(_("File: %(image_file)s", image_file=device["image"])) - flash(_("Parameters: %(value)s", value=device["params"])) - flash(_("Vendor: %(value)s", value=device["vendor"])) - flash(_("Product: %(value)s", value=device["product"])) - flash(_("Revision: %(revision_number)s", revision_number=device["revision"])) - flash(_("Block Size: %(value)s bytes", value=device["block_size"])) - flash(_("Image Size: %(value)s bytes", value=device["size"])) - return redirect(url_for("index")) + message = "\n".join([ + _("DEVICE INFO"), + "===========", + _("SCSI ID: %(id_number)s", id_number=device["id"]), + _("LUN: %(unit_number)s", unit_number=device["unit"]), + _("Type: %(device_type)s", device_type=device["device_type"]), + _("Status: %(device_status)s", device_status=device["status"]), + _("File: %(image_file)s", image_file=device["image"]), + _("Parameters: %(value)s", value=device["params"]), + _("Vendor: %(value)s", value=device["vendor"]), + _("Product: %(value)s", value=device["product"]), + _("Revision: %(revision_number)s", revision_number=device["revision"]), + _("Block Size: %(value)s bytes", value=device["block_size"]), + _("Image Size: %(value)s bytes", value=device["size"]), + ]) + + # Don't send redundant "info" message with the JSON response + if request.headers.get("accept") == "application/json": + return response(device_info={ + "scsi_id": device["id"], + "lun": device["unit"], + "device_type": device["device_type"], + "status": device["status"], + "file": device["image"], + "parameters": device["params"], + "vendor": device["vendor"], + "product": device["product"], + "revision": device["revision"], + "block_size": device["block_size"], + "size": device["size"], + }) + + return response(message=[(message, "info")]) + + return response(error=True, message=devices["msg"]) - flash(devices["msg"], "error") - return redirect(url_for("index")) @APP.route("/scsi/reserve", methods=["POST"]) @login_required @@ -699,12 +741,13 @@ def reserve_id(): process = ractl_cmd.reserve_scsi_ids(reserved_ids) if process["status"]: RESERVATIONS[int(scsi_id)] = memo - flash(_("Reserved SCSI ID %(id_number)s", id_number=scsi_id)) - return redirect(url_for("index")) + return response(message=_("Reserved SCSI ID %(id_number)s", id_number=scsi_id)) + + return response(error=True, message=[ + (_("Failed to reserve SCSI ID %(id_number)s", id_number=scsi_id), "error"), + (process["msg"], "error"), + ]) - flash(_("Failed to reserve SCSI ID %(id_number)s", id_number=scsi_id)) - flash(process["msg"], "error") - return redirect(url_for("index")) @APP.route("/scsi/release", methods=["POST"]) @login_required @@ -718,12 +761,12 @@ def release_id(): process = ractl_cmd.reserve_scsi_ids(reserved_ids) if process["status"]: RESERVATIONS[int(scsi_id)] = "" - flash(_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id)) - return redirect(url_for("index")) + return response(message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id)) - flash(_("Failed to release the reservation for SCSI ID %(id_number)s", id_number=scsi_id)) - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=[ + (_("Failed to release the reservation for SCSI ID %(id_number)s", id_number=scsi_id), "error"), + (process["msg"], "error"), + ]) @APP.route("/pi/reboot", methods=["POST"]) @@ -733,7 +776,7 @@ def restart(): Restarts the Pi """ ractl_cmd.shutdown_pi("reboot") - return redirect(url_for("index")) + return response() @APP.route("/pi/shutdown", methods=["POST"]) @@ -743,7 +786,7 @@ def shutdown(): Shuts down the Pi """ ractl_cmd.shutdown_pi("system") - return redirect(url_for("index")) + return response() @APP.route("/files/download_to_iso", methods=["POST"]) @@ -755,16 +798,18 @@ def download_to_iso(): scsi_id = request.form.get("scsi_id") url = request.form.get("url") iso_args = request.form.get("type").split() + response_messages = [] process = file_cmd.download_file_to_iso(url, *iso_args) process = ReturnCodeMapper.add_msg(process) - if process["status"]: - flash(process["msg"]) - flash(_("Saved image as: %(file_name)s", file_name=process['file_name'])) - else: - flash(_("Failed to create CD-ROM image from %(url)s", url=url), "error") - flash(process["msg"], "error") - return redirect(url_for("index")) + if not process["status"]: + return response(error=True, message=[ + (_("Failed to create CD-ROM image from %(url)s", url=url), "error"), + (process["msg"], "error"), + ]) + + response_messages.append((process["msg"], "success")) + response_messages.append((_("Saved image as: %(file_name)s", file_name=process['file_name']), "success")) process_attach = ractl_cmd.attach_device( scsi_id, @@ -773,13 +818,13 @@ def download_to_iso(): ) process_attach = ReturnCodeMapper.add_msg(process_attach) if process_attach["status"]: - flash(_("Attached to SCSI ID %(id_number)s", id_number=scsi_id)) - return redirect(url_for("index")) + response_messages.append((_("Attached to SCSI ID %(id_number)s", id_number=scsi_id), "success")) + return response(message=response_messages) - flash(_("Failed to attach image to SCSI ID %(id_number)s. Try attaching it manually.", - id_number=scsi_id), "error") - flash(process_attach["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=[ + (_("Failed to attach image to SCSI ID %(id_number)s. Try attaching it manually.", id_number=scsi_id), "error"), + (process_attach["msg"], "error"), + ]) @APP.route("/files/download_to_images", methods=["POST"]) @@ -793,12 +838,12 @@ def download_img(): process = file_cmd.download_to_dir(url, server_info["image_dir"], Path(url).name) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + return response(message=process["msg"]) - flash(_("Failed to download file from %(url)s", url=url), "error") - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=[ + (_("Failed to download file from %(url)s", url=url), "error"), + (process["msg"], "error"), + ]) @APP.route("/files/download_to_afp", methods=["POST"]) @@ -812,12 +857,12 @@ def download_afp(): process = file_cmd.download_to_dir(url, AFP_DIR, file_name) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + return response(message=process["msg"]) - flash(_("Failed to download file from %(url)s", url=url), "error") - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=[ + (_("Failed to download file from %(url)s", url=url), "error"), + (process["msg"], "error"), + ]) @APP.route("/files/upload", methods=["POST"]) @@ -848,11 +893,13 @@ def create_file(): process = file_cmd.create_new_image(file_name, file_type, size) if process["status"]: - flash(_("Image file created: %(file_name)s", file_name=full_file_name)) - return redirect(url_for("index")) + return response( + status_code=201, + message=_("Image file created: %(file_name)s", file_name=full_file_name), + image=full_file_name, + ) - flash(process["msg"], "error") - return redirect(url_for("index")) + return response(error=True, message=process["msg"]) @APP.route("/files/download", methods=["POST"]) @@ -874,26 +921,23 @@ def delete(): file_name = request.form.get("file_name") process = file_cmd.delete_image(file_name) - if process["status"]: - flash(_("Image file deleted: %(file_name)s", file_name=file_name)) - else: - flash(process["msg"], "error") - return redirect(url_for("index")) + if not process["status"]: + return response(error=True, message=process["msg"]) + + response_messages = [ + (_("Image file deleted: %(file_name)s", file_name=file_name), "success")] # Delete the drive properties file, if it exists prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}" if Path(prop_file_path).is_file(): process = file_cmd.delete_file(prop_file_path) process = ReturnCodeMapper.add_msg(process) - if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + response_messages.append((process["msg"], "success")) + else: + response_messages.append((process["msg"], "error")) - flash(process["msg"], "error") - return redirect(url_for("index")) - - return redirect(url_for("index")) + return response(message=response_messages) @APP.route("/files/rename", methods=["POST"]) @@ -906,11 +950,11 @@ def rename(): new_file_name = request.form.get("new_file_name") process = file_cmd.rename_image(file_name, new_file_name) - if process["status"]: - flash(_("Image file renamed to: %(file_name)s", file_name=new_file_name)) - else: - flash(process["msg"], "error") - return redirect(url_for("index")) + if not process["status"]: + return response(error=True, message=process["msg"]) + + response_messages = [ + (_("Image file renamed to: %(file_name)s", file_name=new_file_name), "success")] # Rename the drive properties file, if it exists prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}" @@ -919,13 +963,11 @@ def rename(): process = file_cmd.rename_file(prop_file_path, new_prop_file_path) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + response_messages.append((process["msg"], "success")) + else: + response_messages.append((process["msg"], "error")) - flash(process["msg"], "error") - return redirect(url_for("index")) - - return redirect(url_for("index")) + return response(message=response_messages) @APP.route("/files/copy", methods=["POST"]) @@ -938,11 +980,11 @@ def copy(): new_file_name = request.form.get("copy_file_name") process = file_cmd.copy_image(file_name, new_file_name) - if process["status"]: - flash(_("Copy of image file saved as: %(file_name)s", file_name=new_file_name)) - else: - flash(process["msg"], "error") - return redirect(url_for("index")) + if not process["status"]: + return response(error=True, message=process["msg"]) + + response_messages = [ + (_("Copy of image file saved as: %(file_name)s", file_name=new_file_name), "success")] # Create a copy of the drive properties file, if it exists prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}" @@ -951,13 +993,11 @@ def copy(): process = file_cmd.copy_file(prop_file_path, new_prop_file_path) process = ReturnCodeMapper.add_msg(process) if process["status"]: - flash(process["msg"]) - return redirect(url_for("index")) + response_messages.append((process["msg"], "success")) + else: + response_messages.append((process["msg"], "error")) - flash(process["msg"], "error") - return redirect(url_for("index")) - - return redirect(url_for("index")) + return response(message=response_messages) @APP.route("/files/extract_image", methods=["POST"]) @@ -976,23 +1016,23 @@ def extract_image(): ) if extract_result["return_code"] == ReturnCodes.EXTRACTIMAGE_SUCCESS: - flash(ReturnCodeMapper.add_msg(extract_result).get("msg")) + response_messages = [(ReturnCodeMapper.add_msg(extract_result).get("msg"), "success")] for properties_file in extract_result["properties_files_moved"]: if properties_file["status"]: - flash(_("Properties file %(file)s moved to %(directory)s", + response_messages.append((_("Properties file %(file)s moved to %(directory)s", file=properties_file['name'], directory=CFG_DIR - )) + ), "success")) else: - flash(_("Failed to move properties file %(file)s to %(directory)s", + response_messages.append((_("Failed to move properties file %(file)s to %(directory)s", file=properties_file['name'], directory=CFG_DIR - ), "error") - else: - flash(ReturnCodeMapper.add_msg(extract_result).get("msg"), "error") + ), "error")) - return redirect(url_for("index")) + return response(message=response_messages) + + return response(error=True, message=ReturnCodeMapper.add_msg(extract_result).get("msg")) @APP.route("/language", methods=["POST"]) @@ -1008,8 +1048,7 @@ def change_language(): language = Locale.parse(locale) language_name = language.get_language_name(locale) - flash(_("Changed Web Interface language to %(locale)s", locale=language_name)) - return redirect(url_for("index")) + return response(message=_("Changed Web Interface language to %(locale)s", locale=language_name)) @APP.before_first_request From dd40d7156ad5fb12c931e12b1dfef4c6cb758575 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Mon, 19 Sep 2022 14:16:34 +0100 Subject: [PATCH 05/16] Fix issue causing stale reservations after loading a config --- python/common/src/rascsi/file_cmds.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/common/src/rascsi/file_cmds.py b/python/common/src/rascsi/file_cmds.py index 11b2d9a6..de6f4971 100644 --- a/python/common/src/rascsi/file_cmds.py +++ b/python/common/src/rascsi/file_cmds.py @@ -521,6 +521,8 @@ class FileCmds: # introduce more sophisticated format detection logic here. if isinstance(config, dict): self.ractl.detach_all() + for scsi_id in range(0, 8): + RESERVATIONS[scsi_id] = "" ids_to_reserve = [] for item in config["reserved_ids"]: ids_to_reserve.append(item["id"]) From 6ad436c085d43ea96abf4f74ef9d5e7bb77863f2 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Mon, 19 Sep 2022 14:18:53 +0100 Subject: [PATCH 06/16] Add --headless option to easyinstall.sh, enable web auth by default on standalone web UI installs --- easyinstall.sh | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/easyinstall.sh b/easyinstall.sh index 77489f4e..69e4ef39 100755 --- a/easyinstall.sh +++ b/easyinstall.sh @@ -691,23 +691,31 @@ function setupWiredNetworking() { echo "WARNING: If you continue, the IP address of your Pi may change upon reboot." echo "Please make sure you will not lose access to the Pi system." echo "" - echo "Do you want to proceed with network configuration using the default settings? [Y/n]" - read REPLY - if [ "$REPLY" == "N" ] || [ "$REPLY" == "n" ]; then - echo "Available wired interfaces on this system:" - echo `ip -o addr show scope link | awk '{split($0, a); print $2}' | grep eth` - echo "Please type the wired interface you want to use and press Enter:" - read SELECTED - LAN_INTERFACE=$SELECTED + if [[ -z $HEADLESS ]]; then + echo "Do you want to proceed with network configuration using the default settings? [Y/n]" + read REPLY + + if [ "$REPLY" == "N" ] || [ "$REPLY" == "n" ]; then + echo "Available wired interfaces on this system:" + echo `ip -o addr show scope link | awk '{split($0, a); print $2}' | grep eth` + echo "Please type the wired interface you want to use and press Enter:" + read SELECTED + LAN_INTERFACE=$SELECTED + fi fi if [ "$(grep -c "^denyinterfaces" /etc/dhcpcd.conf)" -ge 1 ]; then echo "WARNING: Network forwarding may already have been configured. Proceeding will overwrite the configuration." - echo "Press enter to continue or CTRL-C to exit" - read REPLY + + if [[ -z $HEADLESS ]]; then + echo "Press enter to continue or CTRL-C to exit" + read REPLY + fi + sudo sed -i /^denyinterfaces/d /etc/dhcpcd.conf fi + sudo bash -c 'echo "denyinterfaces '$LAN_INTERFACE'" >> /etc/dhcpcd.conf' echo "Modified /etc/dhcpcd.conf" @@ -720,6 +728,12 @@ function setupWiredNetworking() { echo "Either use the Web UI, or do this on the command line (assuming SCSI ID 6):" echo "rasctl -i 6 -c attach -t scdp -f $LAN_INTERFACE" echo "" + + if [[ $HEADLESS ]]; then + echo "Skipping reboot in headless mode" + return 0 + fi + echo "We need to reboot your Pi" echo "Press Enter to reboot or CTRL-C to exit" read @@ -1269,6 +1283,7 @@ function runChoice() { preparePythonCommon cachePipPackages installRaScsiWebInterface + enableWebInterfaceAuth echo "Configuring RaSCSI Web Interface stand-alone - Complete!" echo "Launch the Web Interface with the 'start.sh' script. To use a custom port for the web server: 'start.sh --web-port=8081" ;; @@ -1367,6 +1382,9 @@ while [ "$1" != "" ]; do -s | --skip-token) SKIP_TOKEN=1 ;; + -h | --headless) + HEADLESS=1 + ;; *) echo "ERROR: Unknown parameter \"$PARAM\"" exit 1 From 26aa5ebc2a989134c35bc26a7acb28900ceee615 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Mon, 19 Sep 2022 14:20:12 +0100 Subject: [PATCH 07/16] Update Dockerfiles to allow testing of additional RaSCSI web UI features --- docker/rascsi-web/Dockerfile | 19 +++++++++++++++---- docker/rascsi/Dockerfile | 7 ++++--- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/docker/rascsi-web/Dockerfile b/docker/rascsi-web/Dockerfile index 4c4298d5..fbd8b7ac 100644 --- a/docker/rascsi-web/Dockerfile +++ b/docker/rascsi-web/Dockerfile @@ -6,18 +6,29 @@ FROM "${OS_ARCH}/${OS_DISTRO}:${OS_VERSION}" EXPOSE 80 443 ARG DEBIAN_FRONTEND=noninteractive -RUN apt-get update && apt-get install -y --no-install-recommends sudo rsyslog procps +RUN apt-get update && apt-get install -y --no-install-recommends sudo systemd rsyslog procps RUN groupadd pi RUN useradd --create-home --shell /bin/bash -g pi pi RUN echo "pi ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers +RUN echo "pi:rascsi" | chpasswd -WORKDIR /home/pi +RUN mkdir /home/pi/afpshare +RUN touch /etc/dhcpcd.conf +RUN mkdir -p /etc/network/interfaces.d/ + +WORKDIR /home/pi/RASCSI USER pi -COPY --chown=pi:pi . RASCSI -RUN cd RASCSI && ./easyinstall.sh --run_choice=11 --skip-token +COPY --chown=pi:pi . . + +# Standalone RaSCSI web UI +RUN ./easyinstall.sh --run_choice=11 --skip-token + +# Wired network bridge +RUN ./easyinstall.sh --run_choice=6 --headless USER root +WORKDIR /home/pi RUN pip3 install watchdog COPY docker/rascsi-web/start.sh /usr/local/bin/start.sh RUN chmod +x /usr/local/bin/start.sh diff --git a/docker/rascsi/Dockerfile b/docker/rascsi/Dockerfile index 508bd185..4e3c60f5 100644 --- a/docker/rascsi/Dockerfile +++ b/docker/rascsi/Dockerfile @@ -6,15 +6,15 @@ FROM "${OS_ARCH}/${OS_DISTRO}:${OS_VERSION}" EXPOSE 6868 ARG DEBIAN_FRONTEND=noninteractive -RUN apt-get update && apt-get install -y --no-install-recommends sudo rsyslog patch +RUN apt-get update && apt-get install -y --no-install-recommends sudo systemd rsyslog patch RUN groupadd pi RUN useradd --create-home --shell /bin/bash -g pi pi RUN echo "pi ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers -USER pi -COPY --chown=pi:pi . /home/pi/RASCSI WORKDIR /home/pi/RASCSI +USER pi +COPY --chown=pi:pi . . # Workaround for Bullseye amd64 compilation error # https://github.com/akuker/RASCSI/issues/821 @@ -24,6 +24,7 @@ RUN patch -p0 < docker/rascsi/cfilesystem.patch RUN ./easyinstall.sh --run_choice=10 --cores=`nproc` --skip-token USER root +WORKDIR /home/pi COPY docker/rascsi/rascsi_wrapper.sh /usr/local/bin/rascsi_wrapper.sh RUN chmod +x /usr/local/bin/rascsi_wrapper.sh CMD ["/usr/local/bin/rascsi_wrapper.sh", "-L", "trace", "-r", "7", "-F", "/home/pi/images"] From 0e6147e0bb2b81b2b8b7976d5e75adfd9a2d7dca Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Mon, 19 Sep 2022 14:21:31 +0100 Subject: [PATCH 08/16] Setup pytest, flake8, black + add API tests --- .gitignore | 1 + python/web/.flake8 | 2 + python/web/pyproject.toml | 8 + python/web/requirements-dev.txt | 3 + python/web/tests/.gitkeep | 0 python/web/tests/api/test_json_api.py | 894 +++++++++++++++++++++ python/web/tests/assets/test_image.hds | Bin 0 -> 1048576 bytes python/web/tests/assets/test_image.hds.zip | Bin 0 -> 1239 bytes python/web/tests/conftest.py | 53 ++ 9 files changed, 961 insertions(+) create mode 100644 python/web/.flake8 create mode 100644 python/web/pyproject.toml create mode 100644 python/web/requirements-dev.txt delete mode 100644 python/web/tests/.gitkeep create mode 100644 python/web/tests/api/test_json_api.py create mode 100644 python/web/tests/assets/test_image.hds create mode 100644 python/web/tests/assets/test_image.hds.zip create mode 100644 python/web/tests/conftest.py diff --git a/.gitignore b/.gitignore index 7a14233f..f9714d4f 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ src/raspberrypi/hfdisk/ *~ messages.pot messages.mo +report.xml docker/docker-compose.override.yml /docker/volumes/images/* diff --git a/python/web/.flake8 b/python/web/.flake8 new file mode 100644 index 00000000..7da1f960 --- /dev/null +++ b/python/web/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 100 diff --git a/python/web/pyproject.toml b/python/web/pyproject.toml new file mode 100644 index 00000000..7adcc930 --- /dev/null +++ b/python/web/pyproject.toml @@ -0,0 +1,8 @@ +[tool.pytest.ini_options] +addopts = "--junitxml=report.xml" +log_cli = true +log_cli_level = "warn" + +[tool.black] +line-length = 100 +target-version = ['py37', 'py38', 'py39'] diff --git a/python/web/requirements-dev.txt b/python/web/requirements-dev.txt new file mode 100644 index 00000000..1174a04e --- /dev/null +++ b/python/web/requirements-dev.txt @@ -0,0 +1,3 @@ +pytest==7.1.3 +pytest-httpserver==1.0.6 +black==22.8.0 diff --git a/python/web/tests/.gitkeep b/python/web/tests/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/python/web/tests/api/test_json_api.py b/python/web/tests/api/test_json_api.py new file mode 100644 index 00000000..5b2d7dcc --- /dev/null +++ b/python/web/tests/api/test_json_api.py @@ -0,0 +1,894 @@ +import pytest +import uuid +import os + +CFG_DIR = "/home/pi/.config/rascsi" +IMAGES_DIR = "/home/pi/images" +AFP_DIR = "/home/pi/afpshare" +SCSI_ID = 6 +FILE_SIZE_1_MIB = 1048576 +STATUS_SUCCESS = "success" +STATUS_ERROR = "error" + + +@pytest.fixture(scope="function") +def create_test_image(request, http_client): + images = [] + + def create(image_type="hds", size=1, auto_delete=True): + file_prefix = str(uuid.uuid4()) + file_name = f"{file_prefix}.{image_type}" + + response = http_client.post( + "/files/create", + data={ + "file_name": file_prefix, + "type": image_type, + "size": size, + }, + ) + + if response.json()["status"] != STATUS_SUCCESS: + raise Exception("Failed to create temporary image") + + if auto_delete: + images.append(file_name) + + return file_name + + def delete(): + for image in images: + http_client.post("/files/delete", data={"file_name": image}) + + request.addfinalizer(delete) + return create + + +@pytest.fixture(scope="function") +def list_files(http_client): + def files(): + return [f["name"] for f in http_client.get("/").json()["data"]["files"]] + + return files + + +@pytest.fixture(scope="function") +def list_attached_images(http_client): + def files(): + return http_client.get("/").json()["data"]["attached_images"] + + return files + + +@pytest.fixture(scope="function") +def delete_file(http_client): + def delete(file_name): + http_client.post("/files/delete", data={"file_name": file_name}) + + return delete + + +@pytest.fixture(scope="function") +def detach_devices(http_client): + def detach(): + response = http_client.post("/scsi/detach_all") + if response.json()["status"] == STATUS_SUCCESS: + return True + raise Exception("Failed to detach SCSI devices") + + return detach + + +""" +AUTHENTICATION +""" + + +# route("/login", methods=["POST"]) +def test_login_with_valid_credentials(pytestconfig, http_client_unauthenticated): + response = http_client_unauthenticated.post( + "/login", + data={ + "username": pytestconfig.getoption("rascsi_username"), + "password": pytestconfig.getoption("rascsi_password"), + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + + +# route("/login", methods=["POST"]) +def test_login_with_invalid_credentials(http_client_unauthenticated): + response = http_client_unauthenticated.post( + "/login", + data={ + "username": "__INVALID_USER__", + "password": "__INVALID_PASS__", + }, + ) + + response_data = response.json() + + assert response.status_code == 401 + assert response_data["status"] == STATUS_ERROR + assert response_data["messages"][0]["message"] == ( + "You must log in with valid credentials for a user in the 'rascsi' group" + ) + + +# route("/logout") +def test_logout(http_client): + response = http_client.get("/logout") + response.status_code == 200 + + +""" +DEVICE OPERATIONS +""" + + +# route("/scsi/attach", methods=["POST"]) +def test_attach_image(http_client, create_test_image, detach_devices): + test_image = create_test_image() + + response = http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": SCSI_ID, + "unit": 0, + "type": "SCHD", + }, + ) + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"Attached {test_image} as Hard Disk to SCSI ID {SCSI_ID} LUN 0" + ) + + # Cleanup + detach_devices() + + +# route("/scsi/attach_device", methods=["POST"]) +@pytest.mark.parametrize( + "device_name,device_config", + [ + # TODO: Fix networking in container, else SCBR attachment fails + # ("X68000 Host Bridge", {"type": "SCBR", "interface": "eth0", "inet": "10.10.20.1/24"}), + ("DaynaPORT SCSI/Link", {"type": "SCDP", "interface": "eth0", "inet": "10.10.20.1/24"}), + ("Host Services", {"type": "SCHS"}), + ("Printer", {"type": "SCLP", "timeout": 30, "cmd": "lp -oraw %f"}), + ], +) +def test_attach_device(http_client, detach_devices, device_name, device_config): + device_config["scsi_id"] = SCSI_ID + device_config["unit"] = 0 + + response = http_client.post( + "/scsi/attach_device", + data=device_config, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"Attached {device_name} to SCSI ID {SCSI_ID} LUN 0" + ) + + # Cleanup + detach_devices() + + +# route("/scsi/detach", methods=["POST"]) +def test_detach_device(http_client, create_test_image): + test_image = create_test_image() + + http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": SCSI_ID, + "unit": 0, + "type": "SCHD", + }, + ) + + response = http_client.post( + "/scsi/detach", + data={ + "scsi_id": SCSI_ID, + "unit": 0, + }, + ) + + response_data = response.json() + + response.status_code == 200 + response_data["status"] == STATUS_SUCCESS + response_data["messages"][0]["message"] == f"Detached SCSI ID {SCSI_ID} LUN 0" + + +# route("/scsi/detach_all", methods=["POST"]) +def test_detach_all_devices(http_client, create_test_image, list_attached_images): + test_images = [] + scsi_ids = [4, 5, 6] + + for scsi_id in scsi_ids: + test_image = create_test_image() + test_images.append(test_image) + + http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": scsi_id, + "unit": 0, + "type": "SCHD", + }, + ) + + assert list_attached_images() == test_images + + response = http_client.post("/scsi/detach_all") + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert list_attached_images() == [] + + +# route("/scsi/eject", methods=["POST"]) +def test_eject_device(http_client, create_test_image, detach_devices): + test_image = create_test_image() + + http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": SCSI_ID, + "unit": 0, + "type": "SCCD", # CD-ROM + }, + ) + + response = http_client.post( + "/scsi/eject", + data={ + "scsi_id": SCSI_ID, + "unit": 0, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Ejected SCSI ID {SCSI_ID} LUN 0" + + # Cleanup + detach_devices() + + +# route("/scsi/info", methods=["POST"]) +def test_show_device_info(http_client, create_test_image, detach_devices): + test_image = create_test_image() + + http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": SCSI_ID, + "unit": 0, + "type": "SCHD", + }, + ) + + response = http_client.post( + "/scsi/info", + data={ + "scsi_id": SCSI_ID, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "device_info" in response_data["data"] + assert response_data["data"]["device_info"]["file"] == f"{IMAGES_DIR}/{test_image}" + + # Cleanup + detach_devices() + + +# route("/scsi/reserve", methods=["POST"]) +# route("/scsi/release", methods=["POST"]) +def test_reserve_and_release_device(http_client): + scsi_id = 0 + + response = http_client.post( + "/scsi/reserve", + data={ + "scsi_id": scsi_id, + "memo": "TEST", + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Reserved SCSI ID {scsi_id}" + + response = http_client.post( + "/scsi/release", + data={ + "scsi_id": scsi_id, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"Released the reservation for SCSI ID {scsi_id}" + ) + + +""" +FILE OPERATIONS +""" + + +# route("/files/create", methods=["POST"]) +def test_create_file(http_client, list_files, delete_file): + file_prefix = str(uuid.uuid4()) + file_name = f"{file_prefix}.hds" + + response = http_client.post( + "/files/create", + data={ + "file_name": file_prefix, + "type": "hds", + "size": 1, + }, + ) + + response_data = response.json() + + assert response.status_code == 201 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["data"]["image"] == file_name + assert response_data["messages"][0]["message"] == f"Image file created: {file_name}" + assert file_name in list_files() + + # Cleanup + delete_file(file_name) + + +# route("/files/rename", methods=["POST"]) +def test_rename_file(http_client, create_test_image, list_files, delete_file): + original_file = create_test_image(auto_delete=False) + renamed_file = f"{uuid.uuid4()}.rename" + + response = http_client.post( + "/files/rename", + data={"file_name": original_file, "new_file_name": renamed_file}, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Image file renamed to: {renamed_file}" + assert renamed_file in list_files() + + # Cleanup + delete_file(renamed_file) + + +# route("/files/copy", methods=["POST"]) +def test_copy_file(http_client, create_test_image, list_files, delete_file): + original_file = create_test_image() + copy_file = f"{uuid.uuid4()}.copy" + + response = http_client.post( + "/files/copy", + data={ + "file_name": original_file, + "copy_file_name": copy_file, + }, + ) + + response_data = response.json() + files = list_files() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Copy of image file saved as: {copy_file}" + assert original_file in files + assert copy_file in files + + # Cleanup + delete_file(copy_file) + + +# route("/files/delete", methods=["POST"]) +def test_delete_file(http_client, create_test_image, list_files): + file_name = create_test_image() + + response = http_client.post("/files/delete", data={"file_name": file_name}) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Image file deleted: {file_name}" + assert file_name not in list_files() + + +# route("/files/extract_image", methods=["POST"]) +# TODO: Add test files for all supported formats +def test_extract_file(httpserver, http_client, list_files, delete_file): + image_file_name = "test_image.hds" + zip_file_name = "test_image.hds.zip" + http_path = f"/images/{zip_file_name}" + url = httpserver.url_for(http_path) + + with open(f"tests/assets/{zip_file_name}", mode="rb") as file: + zip_file_data = file.read() + + httpserver.expect_request(http_path).respond_with_data( + zip_file_data, + mimetype="application/octet-stream", + ) + + http_client.post( + "/files/download_to_images", + data={ + "url": url, + }, + ) + + response = http_client.post( + "/files/extract_image", + data={ + "archive_file": zip_file_name, + "archive_members": image_file_name, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == "Extracted 1 file(s)" + assert image_file_name in list_files() + + # Cleanup + delete_file(zip_file_name) + delete_file(image_file_name) + + +# route("/files/upload", methods=["POST"]) +def test_upload_file(http_client, delete_file): + file_name = f"{uuid.uuid4()}.test" + + with open("tests/assets/test_image.hds", mode="rb") as file: + file.seek(0, os.SEEK_END) + file_size = file.tell() + file.seek(0, 0) + + number_of_chunks = 4 + + # Note: The test file needs to be cleanly divisible by the chunk size + chunk_size = int(file_size / number_of_chunks) + + for chunk_number in range(0, 4): + if chunk_number == 0: + chunk_byte_offset = 0 + else: + chunk_byte_offset = chunk_number * chunk_size + + form_data = { + "dzuuid": str(uuid.uuid4()), + "dzchunkindex": chunk_number, + "dzchunksize": chunk_size, + "dzchunkbyteoffset": chunk_byte_offset, + "dztotalfilesize": file_size, + "dztotalchunkcount": number_of_chunks, + } + + file_data = {"file": (file_name, file.read(chunk_size))} + + response = http_client.post( + "/files/upload", + data=form_data, + files=file_data, + ) + + assert response.status_code == 200 + assert response.text == "File upload successful!" + + file = [f for f in http_client.get("/").json()["data"]["files"] if f["name"] == file_name][0] + + assert file["size"] == file_size + + # Cleanup + delete_file(file_name) + + +# route("/files/download", methods=["POST"]) +def test_download_file(http_client, create_test_image): + file_name = create_test_image() + + response = http_client.post("/files/download", data={"file": f"{IMAGES_DIR}/{file_name}"}) + + assert response.status_code == 200 + assert response.headers["content-type"] == "application/octet-stream" + assert response.headers["content-disposition"] == f"attachment; filename={file_name}" + assert response.headers["content-length"] == str(FILE_SIZE_1_MIB) + + +# route("/files/download_to_afp", methods=["POST"]) +def test_download_url_to_afp_dir(httpserver, http_client): + file_name = str(uuid.uuid4()) + http_path = f"/images/{file_name}" + url = httpserver.url_for(http_path) + + with open("tests/assets/test_image.hds", mode="rb") as file: + file_data = file.read() + + httpserver.expect_request(http_path).respond_with_data( + file_data, + mimetype="application/octet-stream", + ) + + response = http_client.post( + "/files/download_to_afp", + data={ + "url": url, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {AFP_DIR}" + + +# route("/files/download_to_images", methods=["POST"]) +def test_download_url_to_images_dir(httpserver, http_client, list_files, delete_file): + file_name = str(uuid.uuid4()) + http_path = f"/images/{file_name}" + url = httpserver.url_for(http_path) + + with open("tests/assets/test_image.hds", mode="rb") as file: + test_file_data = file.read() + + httpserver.expect_request(http_path).respond_with_data( + test_file_data, + mimetype="application/octet-stream", + ) + + response = http_client.post( + "/files/download_to_images", + data={ + "url": url, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert file_name in list_files() + assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {IMAGES_DIR}" + + # Cleanup + delete_file(file_name) + + +# route("/files/download_to_iso", methods=["POST"]) +def test_download_url_to_iso( + httpserver, + http_client, + list_files, + list_attached_images, + detach_devices, + delete_file, +): + test_file_name = str(uuid.uuid4()) + iso_file_name = f"{test_file_name}.iso" + + http_path = f"/images/{test_file_name}" + url = httpserver.url_for(http_path) + + with open("tests/assets/test_image.hds", mode="rb") as file: + test_file_data = file.read() + + httpserver.expect_request(http_path).respond_with_data( + test_file_data, + mimetype="application/octet-stream", + ) + + response = http_client.post( + "/files/download_to_iso", + data={ + "scsi_id": SCSI_ID, + "type": "-hfs", + "url": url, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert iso_file_name in list_files() + assert iso_file_name in list_attached_images() + + m = response_data["messages"] + assert m[0]["message"] == 'Created CD-ROM ISO image with arguments "-hfs"' + assert m[1]["message"] == f"Saved image as: {IMAGES_DIR}/{iso_file_name}" + assert m[2]["message"] == f"Attached to SCSI ID {SCSI_ID}" + + # Cleanup + detach_devices() + delete_file(iso_file_name) + + +""" +NAMED DEVICES +""" + + +# route("/drive/list", methods=["GET"]) +def test_show_named_drive_presets(http_client): + response = http_client.get("/drive/list") + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "cd_conf" in response_data["data"] + assert "hd_conf" in response_data["data"] + assert "rm_conf" in response_data["data"] + + +# route("/drive/cdrom", methods=["POST"]) +def test_create_cdrom_properties_file(http_client): + file_name = f"{uuid.uuid4()}.iso" + + response = http_client.post( + "/drive/cdrom", + data={ + "vendor": "TEST_AAA", + "product": "TEST_BBB", + "revision": "1.0A", + "block_size": 2048, + "file_name": file_name, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"File created: {CFG_DIR}/{file_name}.properties" + ) + + +# route("/drive/create", methods=["POST"]) +def test_create_image_with_properties_file(http_client, delete_file): + file_prefix = str(uuid.uuid4()) + file_name = f"{file_prefix}.hds" + + response = http_client.post( + "/drive/create", + data={ + "vendor": "TEST_AAA", + "product": "TEST_BBB", + "revision": "1.0A", + "block_size": 512, + "size": FILE_SIZE_1_MIB, + "file_type": "hds", + "file_name": file_prefix, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Image file created: {file_name}" + + # Cleanup + delete_file(file_name) + + +""" +INDEX & STATIC +""" + + +# route("/") +def test_index(http_client): + response = http_client.get("/") + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "devices" in response_data["data"] + + +# route("/pwa/") +def test_pwa_route(http_client): + response = http_client.get("/pwa/favicon.ico") + + assert response.status_code == 200 + assert response.headers["content-disposition"] == "inline; filename=favicon.ico" + + +""" +SETTINGS +""" + + +# route("/language", methods=["POST"]) +@pytest.mark.parametrize( + "locale,confirm_message", + [ + ("de", "Webinterface-Sprache auf Deutsch geändert"), + ("es", "Se ha cambiado el lenguaje de la Interfaz Web a español"), + ("fr", "Langue de l’interface web changée pour français"), + ("sv", "Bytte webbgränssnittets språk till svenska"), + ("en", "Changed Web Interface language to English"), + ], +) +def test_set_language(http_client, locale, confirm_message): + response = http_client.post( + "/language", + data={ + "locale": locale, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == confirm_message + + +# route("/logs/level", methods=["POST"]) +@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "err", "critical", "off"]) +def test_set_log_level(http_client, level): + response = http_client.post( + "/logs/level", + data={ + "level": level, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Log level set to {level}" + + # Cleanup + http_client.post( + "/logs/level", + data={ + "level": "debug", + }, + ) + + +# route("/logs/show", methods=["POST"]) +def test_show_logs(http_client): + response = http_client.post( + "/logs/show", + data={ + "lines": 100, + "scope": "", + }, + ) + + assert response.status_code == 200 + assert response.text == "-- No entries --\n" + + +# route("/config/load", methods=["POST"]) +def test_save_load_and_delete_configs(http_client): + config_name = str(uuid.uuid4()) + config_json_file = f"{config_name}.json" + reserved_scsi_id = 0 + reservation_memo = str(uuid.uuid4()) + + # Confirm the initial state + assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == "" + + # Save the initial state to a config + response = http_client.post( + "/config/save", + data={ + "name": config_name, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"File created: {CFG_DIR}/{config_json_file}" + ) + + assert config_json_file in http_client.get("/").json()["data"]["config_files"] + + # Modify the state + http_client.post( + "/scsi/reserve", + data={ + "scsi_id": reserved_scsi_id, + "memo": reservation_memo, + }, + ) + + assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == reservation_memo + + # Load the saved config + response = http_client.post( + "/config/load", + data={ + "name": config_json_file, + "load": True, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"Loaded configurations from: {CFG_DIR}/{config_json_file}" + ) + + # Confirm the application has returned to its initial state + assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == "" + + # Delete the saved config + response = http_client.post( + "/config/load", + data={ + "name": config_json_file, + "delete": True, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"File deleted: {CFG_DIR}/{config_json_file}" + ) + + assert config_json_file not in http_client.get("/").json()["data"]["config_files"] diff --git a/python/web/tests/assets/test_image.hds b/python/web/tests/assets/test_image.hds new file mode 100644 index 0000000000000000000000000000000000000000..9e0f96a2a253b173cb45b41868209a5d043e1437 GIT binary patch literal 1048576 zcmeIuF#!Mo0K%a4Pi+Wah(KY$fB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ T0|pEjFkrxd0RsjM82APT0Pp|- literal 0 HcmV?d00001 diff --git a/python/web/tests/assets/test_image.hds.zip b/python/web/tests/assets/test_image.hds.zip new file mode 100644 index 0000000000000000000000000000000000000000..ecd49a5707850a4c2d4ebd42ce9aa3e71cd7e7c1 GIT binary patch literal 1239 zcmWIWW@Zs#-~hsdi}XVo;6Q+Zk3oT`~tXHqm0oI7!83D69RV30p9E!GOsL_ vbFzRuEx-`q&B!Fej3{Q1 Date: Mon, 19 Sep 2022 17:00:59 +0100 Subject: [PATCH 09/16] Expose env info to API clients --- python/web/src/web.py | 10 +++++++++- python/web/tests/api/test_json_api.py | 11 +++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/python/web/src/web.py b/python/web/src/web.py index 918d7310..26a1557b 100644 --- a/python/web/src/web.py +++ b/python/web/src/web.py @@ -260,6 +260,14 @@ def index(): ) +@APP.route("/env") +def env(): + """ + Shows information about the app/host environment + """ + return response(**get_env_info()) + + @APP.route("/drive/list", methods=["GET"]) def drive_list(): """ @@ -325,7 +333,7 @@ def login(): if AUTH_GROUP in groups: if authenticate(str(username), str(password)): session["username"] = request.form["username"] - return response() + return response(env=get_env_info()) return response(error=True, status_code=401, message=_( "You must log in with valid credentials for a user in the '%(group)s' group", diff --git a/python/web/tests/api/test_json_api.py b/python/web/tests/api/test_json_api.py index 5b2d7dcc..b6759ae7 100644 --- a/python/web/tests/api/test_json_api.py +++ b/python/web/tests/api/test_json_api.py @@ -98,6 +98,7 @@ def test_login_with_valid_credentials(pytestconfig, http_client_unauthenticated) assert response.status_code == 200 assert response_data["status"] == STATUS_SUCCESS + assert "env" in response_data["data"] # route("/login", methods=["POST"]) @@ -737,6 +738,16 @@ def test_index(http_client): assert "devices" in response_data["data"] +# route("/env") +def test_get_env_info(http_client): + response = http_client.get("/env") + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "running_env" in response_data["data"] + + # route("/pwa/") def test_pwa_route(http_client): response = http_client.get("/pwa/favicon.ico") From 8062e5f5d7d9096854f52083c867694102ec3641 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Tue, 20 Sep 2022 00:33:47 +0100 Subject: [PATCH 10/16] Updates to allow tests to run against a remote RaSCSI instance --- python/web/tests/api/test_json_api.py | 2 +- python/web/tests/conftest.py | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/python/web/tests/api/test_json_api.py b/python/web/tests/api/test_json_api.py index b6759ae7..eb7d958e 100644 --- a/python/web/tests/api/test_json_api.py +++ b/python/web/tests/api/test_json_api.py @@ -823,7 +823,7 @@ def test_show_logs(http_client): ) assert response.status_code == 200 - assert response.text == "-- No entries --\n" + assert response.headers["content-type"] == "text/plain" # route("/config/load", methods=["POST"]) diff --git a/python/web/tests/conftest.py b/python/web/tests/conftest.py index f88526e5..da10c3f7 100644 --- a/python/web/tests/conftest.py +++ b/python/web/tests/conftest.py @@ -4,15 +4,28 @@ import requests def pytest_addoption(parser): parser.addoption("--base_url", action="store", default="http://localhost:8080") + parser.addoption("--httpserver_host", action="store", default="host.docker.internal") + parser.addoption("--httpserver_listen_address", action="store", default="127.0.0.1") parser.addoption("--rascsi_username", action="store", default="pi") parser.addoption("--rascsi_password", action="store", default="rascsi") +@pytest.fixture(scope="session") +def httpserver_listen_address(pytestconfig): + return (pytestconfig.getoption("httpserver_listen_address"), 0) + + @pytest.fixture(scope="function", autouse=True) -def set_httpserver_hostname(httpserver): +def set_httpserver_hostname(pytestconfig, httpserver): # The HTTP requests are made by Python from within the container so we need # httpserver.url_for to generate URLs which point to the Docker host - httpserver.host = "host.docker.internal" + httpserver.host = pytestconfig.getoption("httpserver_host") + + +@pytest.fixture(scope="session", autouse=True) +def ensure_all_devices_detached(create_http_client): + http_client = create_http_client() + http_client.post("/scsi/detach_all") @pytest.fixture(scope="session") From 65c2286ffedfeb2c8283fd5d4bc796cd46eb60df Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Tue, 20 Sep 2022 01:56:22 +0100 Subject: [PATCH 11/16] Fix shell exit issue in web/start.sh --- python/web/start.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/web/start.sh b/python/web/start.sh index f9e5b26b..c96d3ef3 100755 --- a/python/web/start.sh +++ b/python/web/start.sh @@ -62,8 +62,7 @@ if ! test -e venv; then pip3 install wheel pip3 install -r requirements.txt - git rev-parse --is-inside-work-tree &> /dev/null - if [[ $? -eq 0 ]]; then + if git rev-parse --is-inside-work-tree &> /dev/null; then git rev-parse HEAD > current fi fi From 663de0631e6b20974a6a550c99ec82bdc03274ac Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Thu, 22 Sep 2022 21:41:12 +0100 Subject: [PATCH 12/16] Added tests for extracting .sit and .7z archive formats --- python/web/tests/api/test_json_api.py | 21 +++++++++++------- python/web/tests/assets/test_image.7z | Bin 0 -> 379 bytes python/web/tests/assets/test_image.sit | Bin 0 -> 536 bytes .../{test_image.hds.zip => test_image.zip} | Bin 1239 -> 1257 bytes 4 files changed, 13 insertions(+), 8 deletions(-) create mode 100644 python/web/tests/assets/test_image.7z create mode 100644 python/web/tests/assets/test_image.sit rename python/web/tests/assets/{test_image.hds.zip => test_image.zip} (76%) diff --git a/python/web/tests/api/test_json_api.py b/python/web/tests/api/test_json_api.py index eb7d958e..20ee404b 100644 --- a/python/web/tests/api/test_json_api.py +++ b/python/web/tests/api/test_json_api.py @@ -443,14 +443,19 @@ def test_delete_file(http_client, create_test_image, list_files): # route("/files/extract_image", methods=["POST"]) -# TODO: Add test files for all supported formats -def test_extract_file(httpserver, http_client, list_files, delete_file): - image_file_name = "test_image.hds" - zip_file_name = "test_image.hds.zip" - http_path = f"/images/{zip_file_name}" +@pytest.mark.parametrize( + "archive_file_name,image_file_name", + [ + ("test_image.zip", "test_image_from_zip.hds"), + ("test_image.sit", "test_image_from_sit.hds"), + ("test_image.7z", "test_image_from_7z.hds"), + ], +) +def test_extract_file(httpserver, http_client, list_files, delete_file, archive_file_name, image_file_name): + http_path = f"/images/{archive_file_name}" url = httpserver.url_for(http_path) - with open(f"tests/assets/{zip_file_name}", mode="rb") as file: + with open(f"tests/assets/{archive_file_name}", mode="rb") as file: zip_file_data = file.read() httpserver.expect_request(http_path).respond_with_data( @@ -468,7 +473,7 @@ def test_extract_file(httpserver, http_client, list_files, delete_file): response = http_client.post( "/files/extract_image", data={ - "archive_file": zip_file_name, + "archive_file": archive_file_name, "archive_members": image_file_name, }, ) @@ -481,7 +486,7 @@ def test_extract_file(httpserver, http_client, list_files, delete_file): assert image_file_name in list_files() # Cleanup - delete_file(zip_file_name) + delete_file(archive_file_name) delete_file(image_file_name) diff --git a/python/web/tests/assets/test_image.7z b/python/web/tests/assets/test_image.7z new file mode 100644 index 0000000000000000000000000000000000000000..f9af5b7a8d43082f76fb28c6420d5d9d9cef8a0f GIT binary patch literal 379 zcmXr7+Ou9=hJoew3l8>&3=mKSrL|YuvAqBPpW$XK14I7b|Nj?n|L<<+AzEaa7`S7S z#_6Wz6(5Q5Fn-5GirAwChp%k8M-xd9n`N zEAdQu^fD%R6U&Y7E4tat2Mc`Nc`i;AT(veqcGtO$>Jl-LZ7lpii4YaVmBo6=`MLVR5L5Jdxuh6a1sE8ZBp4VNiWnF{;4g zjFe&qMFs|_gA5Jyj7$vl4E4+{^wTnPQj7Id^2_sb@)J`S82Y1~JpCLqTyhH-85m`l z4}i>sfCeb-bnxno{r4`^PiJ|;75~fL#%4~4{vAh=TS1Mt_I0v}JHLv``hK)^g2;|P z2Q;L3WoADyRD0NGecLFt|GjGGs=!qr9KKFVnCkqd=|*d&Wbe$L7fxIP9DDA+OEErm zT=xA8b30+ruD1I#_+0zHoOv_#*o3O9`Qk+!n=RD8^9$}Xc%Y#DLp$Vxx3}d+#~0$0 zKkU}1f7j1*($;D2^OXI}ecO*sdzjfPcK`YNV?N4_OD3DFVsf{8-zNFw)_u3Yw~WFU zx&9@txtVQ#{;}Me{}pG#lASgkl2v1M4lBzDJjLFLc>iy-#1dv-JP90{{tF&h7vJ literal 0 HcmV?d00001 diff --git a/python/web/tests/assets/test_image.hds.zip b/python/web/tests/assets/test_image.zip similarity index 76% rename from python/web/tests/assets/test_image.hds.zip rename to python/web/tests/assets/test_image.zip index ecd49a5707850a4c2d4ebd42ce9aa3e71cd7e7c1..9d85ff2210051ffed8193829bf4e0c49213c7cda 100644 GIT binary patch literal 1257 zcmWIWW@Zs#-~hs%5@w+ca3H`S&Y-|hl3H96pP8GOo*JK4l%E@4m06&dky0EQ!pp$^ z?~r!#^}{;JXMni0f}4Sng2a>Ory8#>Au4S~@R7%?GW z#~k3z&LQ*4VmT)Z$kPH00p5&EBFu>L23ZbN-oU_;Mi7h4(kH;1l?|kw5eQR(bRO7p E0K*tJ#Q*>R delta 150 zcmaFKd7aZHz?+#xgn`o$|4BTCiBW-IVTH9FHqa$ZWd`PY|4{ME4UdLS-vtdFaWJ&U From f8e58708aae1ac58476fe9dbf7b8b1a0d695bdc8 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Sat, 24 Sep 2022 00:20:49 +0100 Subject: [PATCH 13/16] Update comments --- python/web/src/web.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/web/src/web.py b/python/web/src/web.py index 26a1557b..2af3d135 100644 --- a/python/web/src/web.py +++ b/python/web/src/web.py @@ -148,7 +148,7 @@ def get_locale(): def get_supported_locales(): """ - Returns a list of Locale objects that the Web Interfaces supports + Returns a list of languages supported by the web UI """ locales = [ {"language": x.language, "display_name": x.display_name} @@ -487,7 +487,7 @@ def show_logs(): lines = request.form.get("lines") scope = request.form.get("scope") - # TODO: Render logs in template + # TODO: Render logs in a template (issue #836) and structured JSON returncode, logs = sys_cmd.get_logs(lines, scope) if returncode == 0: headers = {"content-type": "text/plain"} @@ -537,9 +537,8 @@ def attach_device(): error_msg = _("Please follow the instructions at %(url)s", url=error_url) if "interface" in params.keys(): - # TODO: Can the behaviour of this function be made more intuitive? + # Note: is_bridge_configured returns False if the bridge is configured bridge_status = is_bridge_configured(params["interface"]) - # Error condition is truthy if bridge_status: return response(error=True, message=[ (bridge_status, "error"), @@ -699,6 +698,7 @@ def device_info(): # the one and only device that should have been returned device = devices["device_list"][0] if str(device["id"]) == scsi_id: + # TODO: Move the device info to the template instead of a flash message message = "\n".join([ _("DEVICE INFO"), "===========", From 6f7e611aa13162a9164cf4bc1abc47e329ee81d4 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Sat, 24 Sep 2022 03:10:11 +0100 Subject: [PATCH 14/16] Add missing dev dependency flake8 --- python/web/requirements-dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/python/web/requirements-dev.txt b/python/web/requirements-dev.txt index 1174a04e..b935690c 100644 --- a/python/web/requirements-dev.txt +++ b/python/web/requirements-dev.txt @@ -1,3 +1,4 @@ pytest==7.1.3 pytest-httpserver==1.0.6 black==22.8.0 +flake8==5.0.4 From a142af571d0b4d09fe38bab3c95ba2e20cace211 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Sat, 24 Sep 2022 03:10:01 +0100 Subject: [PATCH 15/16] Improve organisation of tests --- python/web/tests/api/conftest.py | 78 +++ python/web/tests/api/test_auth.py | 43 ++ python/web/tests/api/test_devices.py | 227 +++++++ python/web/tests/api/test_files.py | 317 +++++++++ python/web/tests/api/test_json_api.py | 910 -------------------------- python/web/tests/api/test_misc.py | 99 +++ python/web/tests/api/test_settings.py | 149 +++++ 7 files changed, 913 insertions(+), 910 deletions(-) create mode 100644 python/web/tests/api/conftest.py create mode 100644 python/web/tests/api/test_auth.py create mode 100644 python/web/tests/api/test_devices.py create mode 100644 python/web/tests/api/test_files.py delete mode 100644 python/web/tests/api/test_json_api.py create mode 100644 python/web/tests/api/test_misc.py create mode 100644 python/web/tests/api/test_settings.py diff --git a/python/web/tests/api/conftest.py b/python/web/tests/api/conftest.py new file mode 100644 index 00000000..76735679 --- /dev/null +++ b/python/web/tests/api/conftest.py @@ -0,0 +1,78 @@ +import pytest +import uuid + +CFG_DIR = "/home/pi/.config/rascsi" +IMAGES_DIR = "/home/pi/images" +AFP_DIR = "/home/pi/afpshare" +SCSI_ID = 6 +FILE_SIZE_1_MIB = 1048576 +STATUS_SUCCESS = "success" +STATUS_ERROR = "error" + + +@pytest.fixture(scope="function") +def create_test_image(request, http_client): + images = [] + + def create(image_type="hds", size=1, auto_delete=True): + file_prefix = str(uuid.uuid4()) + file_name = f"{file_prefix}.{image_type}" + + response = http_client.post( + "/files/create", + data={ + "file_name": file_prefix, + "type": image_type, + "size": size, + }, + ) + + if response.json()["status"] != STATUS_SUCCESS: + raise Exception("Failed to create temporary image") + + if auto_delete: + images.append(file_name) + + return file_name + + def delete(): + for image in images: + http_client.post("/files/delete", data={"file_name": image}) + + request.addfinalizer(delete) + return create + + +@pytest.fixture(scope="function") +def list_files(http_client): + def files(): + return [f["name"] for f in http_client.get("/").json()["data"]["files"]] + + return files + + +@pytest.fixture(scope="function") +def list_attached_images(http_client): + def files(): + return http_client.get("/").json()["data"]["attached_images"] + + return files + + +@pytest.fixture(scope="function") +def delete_file(http_client): + def delete(file_name): + http_client.post("/files/delete", data={"file_name": file_name}) + + return delete + + +@pytest.fixture(scope="function") +def detach_devices(http_client): + def detach(): + response = http_client.post("/scsi/detach_all") + if response.json()["status"] == STATUS_SUCCESS: + return True + raise Exception("Failed to detach SCSI devices") + + return detach diff --git a/python/web/tests/api/test_auth.py b/python/web/tests/api/test_auth.py new file mode 100644 index 00000000..f54e4897 --- /dev/null +++ b/python/web/tests/api/test_auth.py @@ -0,0 +1,43 @@ +from conftest import STATUS_SUCCESS, STATUS_ERROR + + +# route("/login", methods=["POST"]) +def test_login_with_valid_credentials(pytestconfig, http_client_unauthenticated): + response = http_client_unauthenticated.post( + "/login", + data={ + "username": pytestconfig.getoption("rascsi_username"), + "password": pytestconfig.getoption("rascsi_password"), + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "env" in response_data["data"] + + +# route("/login", methods=["POST"]) +def test_login_with_invalid_credentials(http_client_unauthenticated): + response = http_client_unauthenticated.post( + "/login", + data={ + "username": "__INVALID_USER__", + "password": "__INVALID_PASS__", + }, + ) + + response_data = response.json() + + assert response.status_code == 401 + assert response_data["status"] == STATUS_ERROR + assert response_data["messages"][0]["message"] == ( + "You must log in with valid credentials for a user in the 'rascsi' group" + ) + + +# route("/logout") +def test_logout(http_client): + response = http_client.get("/logout") + response.status_code == 200 diff --git a/python/web/tests/api/test_devices.py b/python/web/tests/api/test_devices.py new file mode 100644 index 00000000..3b3ff56e --- /dev/null +++ b/python/web/tests/api/test_devices.py @@ -0,0 +1,227 @@ +import pytest + +from conftest import ( + IMAGES_DIR, + SCSI_ID, + FILE_SIZE_1_MIB, + STATUS_SUCCESS, +) + + +# route("/scsi/attach", methods=["POST"]) +def test_attach_image(http_client, create_test_image, detach_devices): + test_image = create_test_image() + + response = http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": SCSI_ID, + "unit": 0, + "type": "SCHD", + }, + ) + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"Attached {test_image} as Hard Disk to SCSI ID {SCSI_ID} LUN 0" + ) + + # Cleanup + detach_devices() + + +# route("/scsi/attach_device", methods=["POST"]) +@pytest.mark.parametrize( + "device_name,device_config", + [ + # TODO: Fix networking in container, else SCBR attachment fails + # ("X68000 Host Bridge", {"type": "SCBR", "interface": "eth0", "inet": "10.10.20.1/24"}), + ("DaynaPORT SCSI/Link", {"type": "SCDP", "interface": "eth0", "inet": "10.10.20.1/24"}), + ("Host Services", {"type": "SCHS"}), + ("Printer", {"type": "SCLP", "timeout": 30, "cmd": "lp -oraw %f"}), + ], +) +def test_attach_device(http_client, detach_devices, device_name, device_config): + device_config["scsi_id"] = SCSI_ID + device_config["unit"] = 0 + + response = http_client.post( + "/scsi/attach_device", + data=device_config, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"Attached {device_name} to SCSI ID {SCSI_ID} LUN 0" + ) + + # Cleanup + detach_devices() + + +# route("/scsi/detach", methods=["POST"]) +def test_detach_device(http_client, create_test_image): + test_image = create_test_image() + + http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": SCSI_ID, + "unit": 0, + "type": "SCHD", + }, + ) + + response = http_client.post( + "/scsi/detach", + data={ + "scsi_id": SCSI_ID, + "unit": 0, + }, + ) + + response_data = response.json() + + response.status_code == 200 + response_data["status"] == STATUS_SUCCESS + response_data["messages"][0]["message"] == f"Detached SCSI ID {SCSI_ID} LUN 0" + + +# route("/scsi/detach_all", methods=["POST"]) +def test_detach_all_devices(http_client, create_test_image, list_attached_images): + test_images = [] + scsi_ids = [4, 5, 6] + + for scsi_id in scsi_ids: + test_image = create_test_image() + test_images.append(test_image) + + http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": scsi_id, + "unit": 0, + "type": "SCHD", + }, + ) + + assert list_attached_images() == test_images + + response = http_client.post("/scsi/detach_all") + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert list_attached_images() == [] + + +# route("/scsi/eject", methods=["POST"]) +def test_eject_device(http_client, create_test_image, detach_devices): + test_image = create_test_image() + + http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": SCSI_ID, + "unit": 0, + "type": "SCCD", # CD-ROM + }, + ) + + response = http_client.post( + "/scsi/eject", + data={ + "scsi_id": SCSI_ID, + "unit": 0, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Ejected SCSI ID {SCSI_ID} LUN 0" + + # Cleanup + detach_devices() + + +# route("/scsi/info", methods=["POST"]) +def test_show_device_info(http_client, create_test_image, detach_devices): + test_image = create_test_image() + + http_client.post( + "/scsi/attach", + data={ + "file_name": test_image, + "file_size": FILE_SIZE_1_MIB, + "scsi_id": SCSI_ID, + "unit": 0, + "type": "SCHD", + }, + ) + + response = http_client.post( + "/scsi/info", + data={ + "scsi_id": SCSI_ID, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "device_info" in response_data["data"] + assert response_data["data"]["device_info"]["file"] == f"{IMAGES_DIR}/{test_image}" + + # Cleanup + detach_devices() + + +# route("/scsi/reserve", methods=["POST"]) +# route("/scsi/release", methods=["POST"]) +def test_reserve_and_release_device(http_client): + scsi_id = 0 + + response = http_client.post( + "/scsi/reserve", + data={ + "scsi_id": scsi_id, + "memo": "TEST", + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Reserved SCSI ID {scsi_id}" + + response = http_client.post( + "/scsi/release", + data={ + "scsi_id": scsi_id, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"Released the reservation for SCSI ID {scsi_id}" + ) diff --git a/python/web/tests/api/test_files.py b/python/web/tests/api/test_files.py new file mode 100644 index 00000000..cf0d96d9 --- /dev/null +++ b/python/web/tests/api/test_files.py @@ -0,0 +1,317 @@ +import pytest +import uuid +import os + +from conftest import ( + IMAGES_DIR, + AFP_DIR, + SCSI_ID, + FILE_SIZE_1_MIB, + STATUS_SUCCESS, +) + + +# route("/files/create", methods=["POST"]) +def test_create_file(http_client, list_files, delete_file): + file_prefix = str(uuid.uuid4()) + file_name = f"{file_prefix}.hds" + + response = http_client.post( + "/files/create", + data={ + "file_name": file_prefix, + "type": "hds", + "size": 1, + }, + ) + + response_data = response.json() + + assert response.status_code == 201 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["data"]["image"] == file_name + assert response_data["messages"][0]["message"] == f"Image file created: {file_name}" + assert file_name in list_files() + + # Cleanup + delete_file(file_name) + + +# route("/files/rename", methods=["POST"]) +def test_rename_file(http_client, create_test_image, list_files, delete_file): + original_file = create_test_image(auto_delete=False) + renamed_file = f"{uuid.uuid4()}.rename" + + response = http_client.post( + "/files/rename", + data={"file_name": original_file, "new_file_name": renamed_file}, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Image file renamed to: {renamed_file}" + assert renamed_file in list_files() + + # Cleanup + delete_file(renamed_file) + + +# route("/files/copy", methods=["POST"]) +def test_copy_file(http_client, create_test_image, list_files, delete_file): + original_file = create_test_image() + copy_file = f"{uuid.uuid4()}.copy" + + response = http_client.post( + "/files/copy", + data={ + "file_name": original_file, + "copy_file_name": copy_file, + }, + ) + + response_data = response.json() + files = list_files() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Copy of image file saved as: {copy_file}" + assert original_file in files + assert copy_file in files + + # Cleanup + delete_file(copy_file) + + +# route("/files/delete", methods=["POST"]) +def test_delete_file(http_client, create_test_image, list_files): + file_name = create_test_image() + + response = http_client.post("/files/delete", data={"file_name": file_name}) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Image file deleted: {file_name}" + assert file_name not in list_files() + + +# route("/files/extract_image", methods=["POST"]) +@pytest.mark.parametrize( + "archive_file_name,image_file_name", + [ + ("test_image.zip", "test_image_from_zip.hds"), + ("test_image.sit", "test_image_from_sit.hds"), + ("test_image.7z", "test_image_from_7z.hds"), + ], +) +def test_extract_file( + httpserver, http_client, list_files, delete_file, archive_file_name, image_file_name +): + http_path = f"/images/{archive_file_name}" + url = httpserver.url_for(http_path) + + with open(f"tests/assets/{archive_file_name}", mode="rb") as file: + zip_file_data = file.read() + + httpserver.expect_request(http_path).respond_with_data( + zip_file_data, + mimetype="application/octet-stream", + ) + + http_client.post( + "/files/download_to_images", + data={ + "url": url, + }, + ) + + response = http_client.post( + "/files/extract_image", + data={ + "archive_file": archive_file_name, + "archive_members": image_file_name, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == "Extracted 1 file(s)" + assert image_file_name in list_files() + + # Cleanup + delete_file(archive_file_name) + delete_file(image_file_name) + + +# route("/files/upload", methods=["POST"]) +def test_upload_file(http_client, delete_file): + file_name = f"{uuid.uuid4()}.test" + + with open("tests/assets/test_image.hds", mode="rb") as file: + file.seek(0, os.SEEK_END) + file_size = file.tell() + file.seek(0, 0) + + number_of_chunks = 4 + + # Note: The test file needs to be cleanly divisible by the chunk size + chunk_size = int(file_size / number_of_chunks) + + for chunk_number in range(0, 4): + if chunk_number == 0: + chunk_byte_offset = 0 + else: + chunk_byte_offset = chunk_number * chunk_size + + form_data = { + "dzuuid": str(uuid.uuid4()), + "dzchunkindex": chunk_number, + "dzchunksize": chunk_size, + "dzchunkbyteoffset": chunk_byte_offset, + "dztotalfilesize": file_size, + "dztotalchunkcount": number_of_chunks, + } + + file_data = {"file": (file_name, file.read(chunk_size))} + + response = http_client.post( + "/files/upload", + data=form_data, + files=file_data, + ) + + assert response.status_code == 200 + assert response.text == "File upload successful!" + + file = [f for f in http_client.get("/").json()["data"]["files"] if f["name"] == file_name][0] + + assert file["size"] == file_size + + # Cleanup + delete_file(file_name) + + +# route("/files/download", methods=["POST"]) +def test_download_file(http_client, create_test_image): + file_name = create_test_image() + + response = http_client.post("/files/download", data={"file": f"{IMAGES_DIR}/{file_name}"}) + + assert response.status_code == 200 + assert response.headers["content-type"] == "application/octet-stream" + assert response.headers["content-disposition"] == f"attachment; filename={file_name}" + assert response.headers["content-length"] == str(FILE_SIZE_1_MIB) + + +# route("/files/download_to_afp", methods=["POST"]) +def test_download_url_to_afp_dir(httpserver, http_client): + file_name = str(uuid.uuid4()) + http_path = f"/images/{file_name}" + url = httpserver.url_for(http_path) + + with open("tests/assets/test_image.hds", mode="rb") as file: + file_data = file.read() + + httpserver.expect_request(http_path).respond_with_data( + file_data, + mimetype="application/octet-stream", + ) + + response = http_client.post( + "/files/download_to_afp", + data={ + "url": url, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {AFP_DIR}" + + +# route("/files/download_to_images", methods=["POST"]) +def test_download_url_to_images_dir(httpserver, http_client, list_files, delete_file): + file_name = str(uuid.uuid4()) + http_path = f"/images/{file_name}" + url = httpserver.url_for(http_path) + + with open("tests/assets/test_image.hds", mode="rb") as file: + test_file_data = file.read() + + httpserver.expect_request(http_path).respond_with_data( + test_file_data, + mimetype="application/octet-stream", + ) + + response = http_client.post( + "/files/download_to_images", + data={ + "url": url, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert file_name in list_files() + assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {IMAGES_DIR}" + + # Cleanup + delete_file(file_name) + + +# route("/files/download_to_iso", methods=["POST"]) +def test_download_url_to_iso( + httpserver, + http_client, + list_files, + list_attached_images, + detach_devices, + delete_file, +): + test_file_name = str(uuid.uuid4()) + iso_file_name = f"{test_file_name}.iso" + + http_path = f"/images/{test_file_name}" + url = httpserver.url_for(http_path) + + with open("tests/assets/test_image.hds", mode="rb") as file: + test_file_data = file.read() + + httpserver.expect_request(http_path).respond_with_data( + test_file_data, + mimetype="application/octet-stream", + ) + + response = http_client.post( + "/files/download_to_iso", + data={ + "scsi_id": SCSI_ID, + "type": "-hfs", + "url": url, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert iso_file_name in list_files() + assert iso_file_name in list_attached_images() + + m = response_data["messages"] + assert m[0]["message"] == 'Created CD-ROM ISO image with arguments "-hfs"' + assert m[1]["message"] == f"Saved image as: {IMAGES_DIR}/{iso_file_name}" + assert m[2]["message"] == f"Attached to SCSI ID {SCSI_ID}" + + # Cleanup + detach_devices() + delete_file(iso_file_name) diff --git a/python/web/tests/api/test_json_api.py b/python/web/tests/api/test_json_api.py deleted file mode 100644 index 20ee404b..00000000 --- a/python/web/tests/api/test_json_api.py +++ /dev/null @@ -1,910 +0,0 @@ -import pytest -import uuid -import os - -CFG_DIR = "/home/pi/.config/rascsi" -IMAGES_DIR = "/home/pi/images" -AFP_DIR = "/home/pi/afpshare" -SCSI_ID = 6 -FILE_SIZE_1_MIB = 1048576 -STATUS_SUCCESS = "success" -STATUS_ERROR = "error" - - -@pytest.fixture(scope="function") -def create_test_image(request, http_client): - images = [] - - def create(image_type="hds", size=1, auto_delete=True): - file_prefix = str(uuid.uuid4()) - file_name = f"{file_prefix}.{image_type}" - - response = http_client.post( - "/files/create", - data={ - "file_name": file_prefix, - "type": image_type, - "size": size, - }, - ) - - if response.json()["status"] != STATUS_SUCCESS: - raise Exception("Failed to create temporary image") - - if auto_delete: - images.append(file_name) - - return file_name - - def delete(): - for image in images: - http_client.post("/files/delete", data={"file_name": image}) - - request.addfinalizer(delete) - return create - - -@pytest.fixture(scope="function") -def list_files(http_client): - def files(): - return [f["name"] for f in http_client.get("/").json()["data"]["files"]] - - return files - - -@pytest.fixture(scope="function") -def list_attached_images(http_client): - def files(): - return http_client.get("/").json()["data"]["attached_images"] - - return files - - -@pytest.fixture(scope="function") -def delete_file(http_client): - def delete(file_name): - http_client.post("/files/delete", data={"file_name": file_name}) - - return delete - - -@pytest.fixture(scope="function") -def detach_devices(http_client): - def detach(): - response = http_client.post("/scsi/detach_all") - if response.json()["status"] == STATUS_SUCCESS: - return True - raise Exception("Failed to detach SCSI devices") - - return detach - - -""" -AUTHENTICATION -""" - - -# route("/login", methods=["POST"]) -def test_login_with_valid_credentials(pytestconfig, http_client_unauthenticated): - response = http_client_unauthenticated.post( - "/login", - data={ - "username": pytestconfig.getoption("rascsi_username"), - "password": pytestconfig.getoption("rascsi_password"), - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert "env" in response_data["data"] - - -# route("/login", methods=["POST"]) -def test_login_with_invalid_credentials(http_client_unauthenticated): - response = http_client_unauthenticated.post( - "/login", - data={ - "username": "__INVALID_USER__", - "password": "__INVALID_PASS__", - }, - ) - - response_data = response.json() - - assert response.status_code == 401 - assert response_data["status"] == STATUS_ERROR - assert response_data["messages"][0]["message"] == ( - "You must log in with valid credentials for a user in the 'rascsi' group" - ) - - -# route("/logout") -def test_logout(http_client): - response = http_client.get("/logout") - response.status_code == 200 - - -""" -DEVICE OPERATIONS -""" - - -# route("/scsi/attach", methods=["POST"]) -def test_attach_image(http_client, create_test_image, detach_devices): - test_image = create_test_image() - - response = http_client.post( - "/scsi/attach", - data={ - "file_name": test_image, - "file_size": FILE_SIZE_1_MIB, - "scsi_id": SCSI_ID, - "unit": 0, - "type": "SCHD", - }, - ) - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == ( - f"Attached {test_image} as Hard Disk to SCSI ID {SCSI_ID} LUN 0" - ) - - # Cleanup - detach_devices() - - -# route("/scsi/attach_device", methods=["POST"]) -@pytest.mark.parametrize( - "device_name,device_config", - [ - # TODO: Fix networking in container, else SCBR attachment fails - # ("X68000 Host Bridge", {"type": "SCBR", "interface": "eth0", "inet": "10.10.20.1/24"}), - ("DaynaPORT SCSI/Link", {"type": "SCDP", "interface": "eth0", "inet": "10.10.20.1/24"}), - ("Host Services", {"type": "SCHS"}), - ("Printer", {"type": "SCLP", "timeout": 30, "cmd": "lp -oraw %f"}), - ], -) -def test_attach_device(http_client, detach_devices, device_name, device_config): - device_config["scsi_id"] = SCSI_ID - device_config["unit"] = 0 - - response = http_client.post( - "/scsi/attach_device", - data=device_config, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == ( - f"Attached {device_name} to SCSI ID {SCSI_ID} LUN 0" - ) - - # Cleanup - detach_devices() - - -# route("/scsi/detach", methods=["POST"]) -def test_detach_device(http_client, create_test_image): - test_image = create_test_image() - - http_client.post( - "/scsi/attach", - data={ - "file_name": test_image, - "file_size": FILE_SIZE_1_MIB, - "scsi_id": SCSI_ID, - "unit": 0, - "type": "SCHD", - }, - ) - - response = http_client.post( - "/scsi/detach", - data={ - "scsi_id": SCSI_ID, - "unit": 0, - }, - ) - - response_data = response.json() - - response.status_code == 200 - response_data["status"] == STATUS_SUCCESS - response_data["messages"][0]["message"] == f"Detached SCSI ID {SCSI_ID} LUN 0" - - -# route("/scsi/detach_all", methods=["POST"]) -def test_detach_all_devices(http_client, create_test_image, list_attached_images): - test_images = [] - scsi_ids = [4, 5, 6] - - for scsi_id in scsi_ids: - test_image = create_test_image() - test_images.append(test_image) - - http_client.post( - "/scsi/attach", - data={ - "file_name": test_image, - "file_size": FILE_SIZE_1_MIB, - "scsi_id": scsi_id, - "unit": 0, - "type": "SCHD", - }, - ) - - assert list_attached_images() == test_images - - response = http_client.post("/scsi/detach_all") - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert list_attached_images() == [] - - -# route("/scsi/eject", methods=["POST"]) -def test_eject_device(http_client, create_test_image, detach_devices): - test_image = create_test_image() - - http_client.post( - "/scsi/attach", - data={ - "file_name": test_image, - "file_size": FILE_SIZE_1_MIB, - "scsi_id": SCSI_ID, - "unit": 0, - "type": "SCCD", # CD-ROM - }, - ) - - response = http_client.post( - "/scsi/eject", - data={ - "scsi_id": SCSI_ID, - "unit": 0, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"Ejected SCSI ID {SCSI_ID} LUN 0" - - # Cleanup - detach_devices() - - -# route("/scsi/info", methods=["POST"]) -def test_show_device_info(http_client, create_test_image, detach_devices): - test_image = create_test_image() - - http_client.post( - "/scsi/attach", - data={ - "file_name": test_image, - "file_size": FILE_SIZE_1_MIB, - "scsi_id": SCSI_ID, - "unit": 0, - "type": "SCHD", - }, - ) - - response = http_client.post( - "/scsi/info", - data={ - "scsi_id": SCSI_ID, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert "device_info" in response_data["data"] - assert response_data["data"]["device_info"]["file"] == f"{IMAGES_DIR}/{test_image}" - - # Cleanup - detach_devices() - - -# route("/scsi/reserve", methods=["POST"]) -# route("/scsi/release", methods=["POST"]) -def test_reserve_and_release_device(http_client): - scsi_id = 0 - - response = http_client.post( - "/scsi/reserve", - data={ - "scsi_id": scsi_id, - "memo": "TEST", - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"Reserved SCSI ID {scsi_id}" - - response = http_client.post( - "/scsi/release", - data={ - "scsi_id": scsi_id, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == ( - f"Released the reservation for SCSI ID {scsi_id}" - ) - - -""" -FILE OPERATIONS -""" - - -# route("/files/create", methods=["POST"]) -def test_create_file(http_client, list_files, delete_file): - file_prefix = str(uuid.uuid4()) - file_name = f"{file_prefix}.hds" - - response = http_client.post( - "/files/create", - data={ - "file_name": file_prefix, - "type": "hds", - "size": 1, - }, - ) - - response_data = response.json() - - assert response.status_code == 201 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["data"]["image"] == file_name - assert response_data["messages"][0]["message"] == f"Image file created: {file_name}" - assert file_name in list_files() - - # Cleanup - delete_file(file_name) - - -# route("/files/rename", methods=["POST"]) -def test_rename_file(http_client, create_test_image, list_files, delete_file): - original_file = create_test_image(auto_delete=False) - renamed_file = f"{uuid.uuid4()}.rename" - - response = http_client.post( - "/files/rename", - data={"file_name": original_file, "new_file_name": renamed_file}, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"Image file renamed to: {renamed_file}" - assert renamed_file in list_files() - - # Cleanup - delete_file(renamed_file) - - -# route("/files/copy", methods=["POST"]) -def test_copy_file(http_client, create_test_image, list_files, delete_file): - original_file = create_test_image() - copy_file = f"{uuid.uuid4()}.copy" - - response = http_client.post( - "/files/copy", - data={ - "file_name": original_file, - "copy_file_name": copy_file, - }, - ) - - response_data = response.json() - files = list_files() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"Copy of image file saved as: {copy_file}" - assert original_file in files - assert copy_file in files - - # Cleanup - delete_file(copy_file) - - -# route("/files/delete", methods=["POST"]) -def test_delete_file(http_client, create_test_image, list_files): - file_name = create_test_image() - - response = http_client.post("/files/delete", data={"file_name": file_name}) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"Image file deleted: {file_name}" - assert file_name not in list_files() - - -# route("/files/extract_image", methods=["POST"]) -@pytest.mark.parametrize( - "archive_file_name,image_file_name", - [ - ("test_image.zip", "test_image_from_zip.hds"), - ("test_image.sit", "test_image_from_sit.hds"), - ("test_image.7z", "test_image_from_7z.hds"), - ], -) -def test_extract_file(httpserver, http_client, list_files, delete_file, archive_file_name, image_file_name): - http_path = f"/images/{archive_file_name}" - url = httpserver.url_for(http_path) - - with open(f"tests/assets/{archive_file_name}", mode="rb") as file: - zip_file_data = file.read() - - httpserver.expect_request(http_path).respond_with_data( - zip_file_data, - mimetype="application/octet-stream", - ) - - http_client.post( - "/files/download_to_images", - data={ - "url": url, - }, - ) - - response = http_client.post( - "/files/extract_image", - data={ - "archive_file": archive_file_name, - "archive_members": image_file_name, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == "Extracted 1 file(s)" - assert image_file_name in list_files() - - # Cleanup - delete_file(archive_file_name) - delete_file(image_file_name) - - -# route("/files/upload", methods=["POST"]) -def test_upload_file(http_client, delete_file): - file_name = f"{uuid.uuid4()}.test" - - with open("tests/assets/test_image.hds", mode="rb") as file: - file.seek(0, os.SEEK_END) - file_size = file.tell() - file.seek(0, 0) - - number_of_chunks = 4 - - # Note: The test file needs to be cleanly divisible by the chunk size - chunk_size = int(file_size / number_of_chunks) - - for chunk_number in range(0, 4): - if chunk_number == 0: - chunk_byte_offset = 0 - else: - chunk_byte_offset = chunk_number * chunk_size - - form_data = { - "dzuuid": str(uuid.uuid4()), - "dzchunkindex": chunk_number, - "dzchunksize": chunk_size, - "dzchunkbyteoffset": chunk_byte_offset, - "dztotalfilesize": file_size, - "dztotalchunkcount": number_of_chunks, - } - - file_data = {"file": (file_name, file.read(chunk_size))} - - response = http_client.post( - "/files/upload", - data=form_data, - files=file_data, - ) - - assert response.status_code == 200 - assert response.text == "File upload successful!" - - file = [f for f in http_client.get("/").json()["data"]["files"] if f["name"] == file_name][0] - - assert file["size"] == file_size - - # Cleanup - delete_file(file_name) - - -# route("/files/download", methods=["POST"]) -def test_download_file(http_client, create_test_image): - file_name = create_test_image() - - response = http_client.post("/files/download", data={"file": f"{IMAGES_DIR}/{file_name}"}) - - assert response.status_code == 200 - assert response.headers["content-type"] == "application/octet-stream" - assert response.headers["content-disposition"] == f"attachment; filename={file_name}" - assert response.headers["content-length"] == str(FILE_SIZE_1_MIB) - - -# route("/files/download_to_afp", methods=["POST"]) -def test_download_url_to_afp_dir(httpserver, http_client): - file_name = str(uuid.uuid4()) - http_path = f"/images/{file_name}" - url = httpserver.url_for(http_path) - - with open("tests/assets/test_image.hds", mode="rb") as file: - file_data = file.read() - - httpserver.expect_request(http_path).respond_with_data( - file_data, - mimetype="application/octet-stream", - ) - - response = http_client.post( - "/files/download_to_afp", - data={ - "url": url, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {AFP_DIR}" - - -# route("/files/download_to_images", methods=["POST"]) -def test_download_url_to_images_dir(httpserver, http_client, list_files, delete_file): - file_name = str(uuid.uuid4()) - http_path = f"/images/{file_name}" - url = httpserver.url_for(http_path) - - with open("tests/assets/test_image.hds", mode="rb") as file: - test_file_data = file.read() - - httpserver.expect_request(http_path).respond_with_data( - test_file_data, - mimetype="application/octet-stream", - ) - - response = http_client.post( - "/files/download_to_images", - data={ - "url": url, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert file_name in list_files() - assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {IMAGES_DIR}" - - # Cleanup - delete_file(file_name) - - -# route("/files/download_to_iso", methods=["POST"]) -def test_download_url_to_iso( - httpserver, - http_client, - list_files, - list_attached_images, - detach_devices, - delete_file, -): - test_file_name = str(uuid.uuid4()) - iso_file_name = f"{test_file_name}.iso" - - http_path = f"/images/{test_file_name}" - url = httpserver.url_for(http_path) - - with open("tests/assets/test_image.hds", mode="rb") as file: - test_file_data = file.read() - - httpserver.expect_request(http_path).respond_with_data( - test_file_data, - mimetype="application/octet-stream", - ) - - response = http_client.post( - "/files/download_to_iso", - data={ - "scsi_id": SCSI_ID, - "type": "-hfs", - "url": url, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert iso_file_name in list_files() - assert iso_file_name in list_attached_images() - - m = response_data["messages"] - assert m[0]["message"] == 'Created CD-ROM ISO image with arguments "-hfs"' - assert m[1]["message"] == f"Saved image as: {IMAGES_DIR}/{iso_file_name}" - assert m[2]["message"] == f"Attached to SCSI ID {SCSI_ID}" - - # Cleanup - detach_devices() - delete_file(iso_file_name) - - -""" -NAMED DEVICES -""" - - -# route("/drive/list", methods=["GET"]) -def test_show_named_drive_presets(http_client): - response = http_client.get("/drive/list") - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert "cd_conf" in response_data["data"] - assert "hd_conf" in response_data["data"] - assert "rm_conf" in response_data["data"] - - -# route("/drive/cdrom", methods=["POST"]) -def test_create_cdrom_properties_file(http_client): - file_name = f"{uuid.uuid4()}.iso" - - response = http_client.post( - "/drive/cdrom", - data={ - "vendor": "TEST_AAA", - "product": "TEST_BBB", - "revision": "1.0A", - "block_size": 2048, - "file_name": file_name, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == ( - f"File created: {CFG_DIR}/{file_name}.properties" - ) - - -# route("/drive/create", methods=["POST"]) -def test_create_image_with_properties_file(http_client, delete_file): - file_prefix = str(uuid.uuid4()) - file_name = f"{file_prefix}.hds" - - response = http_client.post( - "/drive/create", - data={ - "vendor": "TEST_AAA", - "product": "TEST_BBB", - "revision": "1.0A", - "block_size": 512, - "size": FILE_SIZE_1_MIB, - "file_type": "hds", - "file_name": file_prefix, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"Image file created: {file_name}" - - # Cleanup - delete_file(file_name) - - -""" -INDEX & STATIC -""" - - -# route("/") -def test_index(http_client): - response = http_client.get("/") - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert "devices" in response_data["data"] - - -# route("/env") -def test_get_env_info(http_client): - response = http_client.get("/env") - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert "running_env" in response_data["data"] - - -# route("/pwa/") -def test_pwa_route(http_client): - response = http_client.get("/pwa/favicon.ico") - - assert response.status_code == 200 - assert response.headers["content-disposition"] == "inline; filename=favicon.ico" - - -""" -SETTINGS -""" - - -# route("/language", methods=["POST"]) -@pytest.mark.parametrize( - "locale,confirm_message", - [ - ("de", "Webinterface-Sprache auf Deutsch geändert"), - ("es", "Se ha cambiado el lenguaje de la Interfaz Web a español"), - ("fr", "Langue de l’interface web changée pour français"), - ("sv", "Bytte webbgränssnittets språk till svenska"), - ("en", "Changed Web Interface language to English"), - ], -) -def test_set_language(http_client, locale, confirm_message): - response = http_client.post( - "/language", - data={ - "locale": locale, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == confirm_message - - -# route("/logs/level", methods=["POST"]) -@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "err", "critical", "off"]) -def test_set_log_level(http_client, level): - response = http_client.post( - "/logs/level", - data={ - "level": level, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == f"Log level set to {level}" - - # Cleanup - http_client.post( - "/logs/level", - data={ - "level": "debug", - }, - ) - - -# route("/logs/show", methods=["POST"]) -def test_show_logs(http_client): - response = http_client.post( - "/logs/show", - data={ - "lines": 100, - "scope": "", - }, - ) - - assert response.status_code == 200 - assert response.headers["content-type"] == "text/plain" - - -# route("/config/load", methods=["POST"]) -def test_save_load_and_delete_configs(http_client): - config_name = str(uuid.uuid4()) - config_json_file = f"{config_name}.json" - reserved_scsi_id = 0 - reservation_memo = str(uuid.uuid4()) - - # Confirm the initial state - assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == "" - - # Save the initial state to a config - response = http_client.post( - "/config/save", - data={ - "name": config_name, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == ( - f"File created: {CFG_DIR}/{config_json_file}" - ) - - assert config_json_file in http_client.get("/").json()["data"]["config_files"] - - # Modify the state - http_client.post( - "/scsi/reserve", - data={ - "scsi_id": reserved_scsi_id, - "memo": reservation_memo, - }, - ) - - assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == reservation_memo - - # Load the saved config - response = http_client.post( - "/config/load", - data={ - "name": config_json_file, - "load": True, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == ( - f"Loaded configurations from: {CFG_DIR}/{config_json_file}" - ) - - # Confirm the application has returned to its initial state - assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == "" - - # Delete the saved config - response = http_client.post( - "/config/load", - data={ - "name": config_json_file, - "delete": True, - }, - ) - - response_data = response.json() - - assert response.status_code == 200 - assert response_data["status"] == STATUS_SUCCESS - assert response_data["messages"][0]["message"] == ( - f"File deleted: {CFG_DIR}/{config_json_file}" - ) - - assert config_json_file not in http_client.get("/").json()["data"]["config_files"] diff --git a/python/web/tests/api/test_misc.py b/python/web/tests/api/test_misc.py new file mode 100644 index 00000000..5a4f6f45 --- /dev/null +++ b/python/web/tests/api/test_misc.py @@ -0,0 +1,99 @@ +import uuid + +from conftest import ( + CFG_DIR, + FILE_SIZE_1_MIB, + STATUS_SUCCESS, +) + + +# route("/") +def test_index(http_client): + response = http_client.get("/") + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "devices" in response_data["data"] + + +# route("/env") +def test_get_env_info(http_client): + response = http_client.get("/env") + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "running_env" in response_data["data"] + + +# route("/pwa/") +def test_pwa_route(http_client): + response = http_client.get("/pwa/favicon.ico") + + assert response.status_code == 200 + assert response.headers["content-disposition"] == "inline; filename=favicon.ico" + + +# route("/drive/list", methods=["GET"]) +def test_show_named_drive_presets(http_client): + response = http_client.get("/drive/list") + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert "cd_conf" in response_data["data"] + assert "hd_conf" in response_data["data"] + assert "rm_conf" in response_data["data"] + + +# route("/drive/cdrom", methods=["POST"]) +def test_create_cdrom_properties_file(http_client): + file_name = f"{uuid.uuid4()}.iso" + + response = http_client.post( + "/drive/cdrom", + data={ + "vendor": "TEST_AAA", + "product": "TEST_BBB", + "revision": "1.0A", + "block_size": 2048, + "file_name": file_name, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"File created: {CFG_DIR}/{file_name}.properties" + ) + + +# route("/drive/create", methods=["POST"]) +def test_create_image_with_properties_file(http_client, delete_file): + file_prefix = str(uuid.uuid4()) + file_name = f"{file_prefix}.hds" + + response = http_client.post( + "/drive/create", + data={ + "vendor": "TEST_AAA", + "product": "TEST_BBB", + "revision": "1.0A", + "block_size": 512, + "size": FILE_SIZE_1_MIB, + "file_type": "hds", + "file_name": file_prefix, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Image file created: {file_name}" + + # Cleanup + delete_file(file_name) diff --git a/python/web/tests/api/test_settings.py b/python/web/tests/api/test_settings.py new file mode 100644 index 00000000..9e7c276a --- /dev/null +++ b/python/web/tests/api/test_settings.py @@ -0,0 +1,149 @@ +import pytest +import uuid + +from conftest import CFG_DIR, STATUS_SUCCESS + + +# route("/language", methods=["POST"]) +@pytest.mark.parametrize( + "locale,confirm_message", + [ + ("de", "Webinterface-Sprache auf Deutsch geändert"), + ("es", "Se ha cambiado el lenguaje de la Interfaz Web a español"), + ("fr", "Langue de l’interface web changée pour français"), + ("sv", "Bytte webbgränssnittets språk till svenska"), + ("en", "Changed Web Interface language to English"), + ], +) +def test_set_language(http_client, locale, confirm_message): + response = http_client.post( + "/language", + data={ + "locale": locale, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == confirm_message + + +# route("/logs/level", methods=["POST"]) +@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "err", "critical", "off"]) +def test_set_log_level(http_client, level): + response = http_client.post( + "/logs/level", + data={ + "level": level, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Log level set to {level}" + + # Cleanup + http_client.post( + "/logs/level", + data={ + "level": "debug", + }, + ) + + +# route("/logs/show", methods=["POST"]) +def test_show_logs(http_client): + response = http_client.post( + "/logs/show", + data={ + "lines": 100, + "scope": "", + }, + ) + + assert response.status_code == 200 + assert response.headers["content-type"] == "text/plain" + + +# route("/config/save", methods=["POST"]) +# route("/config/load", methods=["POST"]) +def test_save_load_and_delete_configs(http_client): + config_name = str(uuid.uuid4()) + config_json_file = f"{config_name}.json" + reserved_scsi_id = 0 + reservation_memo = str(uuid.uuid4()) + + # Confirm the initial state + assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == "" + + # Save the initial state to a config + response = http_client.post( + "/config/save", + data={ + "name": config_name, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"File created: {CFG_DIR}/{config_json_file}" + ) + + assert config_json_file in http_client.get("/").json()["data"]["config_files"] + + # Modify the state + http_client.post( + "/scsi/reserve", + data={ + "scsi_id": reserved_scsi_id, + "memo": reservation_memo, + }, + ) + + assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == reservation_memo + + # Load the saved config + response = http_client.post( + "/config/load", + data={ + "name": config_json_file, + "load": True, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"Loaded configurations from: {CFG_DIR}/{config_json_file}" + ) + + # Confirm the application has returned to its initial state + assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == "" + + # Delete the saved config + response = http_client.post( + "/config/load", + data={ + "name": config_json_file, + "delete": True, + }, + ) + + response_data = response.json() + + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == ( + f"File deleted: {CFG_DIR}/{config_json_file}" + ) + + assert config_json_file not in http_client.get("/").json()["data"]["config_files"] From 75b0994b77864f0bf98dfa9dde59965119905967 Mon Sep 17 00:00:00 2001 From: nucleogenic Date: Sun, 25 Sep 2022 18:15:35 +0100 Subject: [PATCH 16/16] Add missing asserts to API tests --- python/web/tests/api/test_auth.py | 2 +- python/web/tests/api/test_devices.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/web/tests/api/test_auth.py b/python/web/tests/api/test_auth.py index f54e4897..3a720ea9 100644 --- a/python/web/tests/api/test_auth.py +++ b/python/web/tests/api/test_auth.py @@ -40,4 +40,4 @@ def test_login_with_invalid_credentials(http_client_unauthenticated): # route("/logout") def test_logout(http_client): response = http_client.get("/logout") - response.status_code == 200 + assert response.status_code == 200 diff --git a/python/web/tests/api/test_devices.py b/python/web/tests/api/test_devices.py index 3b3ff56e..640f20bd 100644 --- a/python/web/tests/api/test_devices.py +++ b/python/web/tests/api/test_devices.py @@ -91,9 +91,9 @@ def test_detach_device(http_client, create_test_image): response_data = response.json() - response.status_code == 200 - response_data["status"] == STATUS_SUCCESS - response_data["messages"][0]["message"] == f"Detached SCSI ID {SCSI_ID} LUN 0" + assert response.status_code == 200 + assert response_data["status"] == STATUS_SUCCESS + assert response_data["messages"][0]["message"] == f"Detached SCSI ID {SCSI_ID} LUN 0" # route("/scsi/detach_all", methods=["POST"])