import logging from flask import ( Flask, render_template, request, flash, url_for, redirect, send_file, send_from_directory, make_response, ) from file_cmds import ( list_files, list_config_files, create_new_image, download_file_to_iso, delete_file, unzip_file, download_image, write_config, read_config, write_drive_properties, read_drive_properties, ) from pi_cmds import ( shutdown_pi, reboot_pi, running_env, rascsi_service, is_bridge_setup, disk_space, ) from ractl_cmds import ( attach_image, list_devices, sort_and_format_devices, detach_by_id, eject_by_id, get_valid_scsi_ids, detach_all, get_server_info, get_network_info, get_device_types, validate_scsi_id, set_log_level, ) from settings import * app = Flask(__name__) @app.route("/") def index(): server_info = get_server_info() disk = disk_space() devices = list_devices() device_types=get_device_types() files=list_files() config_files=list_config_files() sorted_image_files = sorted(files["files"], key = lambda x: x["name"].lower()) sorted_config_files = sorted(config_files, key = lambda x: x.lower()) reserved_scsi_ids = server_info["reserved_ids"] formatted_devices = sort_and_format_devices(devices["device_list"]) scsi_ids = get_valid_scsi_ids(devices["device_list"], reserved_scsi_ids) return render_template( "index.html", bridge_configured=is_bridge_setup(), devices=formatted_devices, files=sorted_image_files, config_files=sorted_config_files, base_dir=base_dir, scsi_ids=scsi_ids, reserved_scsi_ids=reserved_scsi_ids, max_file_size=int(MAX_FILE_SIZE / 1024 / 1024), running_env=running_env(), server_info=server_info, netinfo=get_network_info(), device_types=device_types["device_types"], free_disk=int(disk["free"] / 1024 / 1024), valid_file_suffix="."+", .".join(VALID_FILE_SUFFIX), removable_device_types=REMOVABLE_DEVICE_TYPES, harddrive_file_suffix=HARDDRIVE_FILE_SUFFIX, cdrom_file_suffix=CDROM_FILE_SUFFIX, removable_file_suffix=REMOVABLE_FILE_SUFFIX, archive_file_suffix=ARCHIVE_FILE_SUFFIX, ) @app.route("/drive/list", methods=["GET"]) def drive_list(): """ Sets up the data structures and kicks off the rendering of the drive list page """ server_info = get_server_info() disk = disk_space() # Reads the canonical drive properties into a dict # The file resides in the current dir of the web ui process from pathlib import Path drive_properties = Path(DRIVE_PROPERTIES_FILE) if drive_properties.is_file(): process = read_drive_properties(str(drive_properties)) if process["status"] == False: flash(process["msg"], "error") return redirect(url_for("index")) conf = process["conf"] else: flash("Could not read drive properties from " + str(drive_properties), "error") return redirect(url_for("index")) hd_conf = [] cd_conf = [] rm_conf = [] from werkzeug.utils import secure_filename for d in conf: if d["device_type"] == "SCHD": d["secure_name"] = secure_filename(d["name"]) d["size_mb"] = "{:,.2f}".format(d["size"] / 1024 / 1024) hd_conf.append(d) elif d["device_type"] == "SCCD": d["size_mb"] = "N/A" cd_conf.append(d) elif d["device_type"] == "SCRM": d["secure_name"] = secure_filename(d["name"]) d["size_mb"] = "{:,.2f}".format(d["size"] / 1024 / 1024) rm_conf.append(d) files=list_files() sorted_image_files = sorted(files["files"], key = lambda x: x["name"].lower()) hd_conf = sorted(hd_conf, key = lambda x: x["name"].lower()) cd_conf = sorted(cd_conf, key = lambda x: x["name"].lower()) rm_conf = sorted(rm_conf, key = lambda x: x["name"].lower()) return render_template( "drives.html", files=sorted_image_files, base_dir=base_dir, hd_conf=hd_conf, cd_conf=cd_conf, rm_conf=rm_conf, running_env=running_env(), server_info=server_info, free_disk=int(disk["free"] / 1024 / 1024), cdrom_file_suffix=CDROM_FILE_SUFFIX, ) @app.route('/pwa/') def send_pwa_files(path): return send_from_directory('pwa', path) @app.route("/drive/create", methods=["POST"]) def drive_create(): vendor = request.form.get("vendor") product = request.form.get("product") revision = request.form.get("revision") block_size = request.form.get("block_size") size = request.form.get("size") file_type = request.form.get("file_type") file_name = request.form.get("file_name") # Creating the image file process = create_new_image(file_name, file_type, size) if process["status"] == True: flash(f"Drive image file {file_name}.{file_type} created") flash(process["msg"]) else: flash(f"Failed to create file {file_name}.{file_type}", "error") flash(process["msg"], "error") return redirect(url_for("index")) # Creating the drive properties file from pathlib import Path file_name = str(Path(file_name).stem) + "." + PROPERTIES_SUFFIX properties = {"vendor": vendor, "product": product, \ "revision": revision, "block_size": block_size} process = write_drive_properties(file_name, properties) if process["status"] == True: flash(f"Drive properties file {file_name} created") return redirect(url_for("index")) else: flash(f"Failed to create drive properties file {file_name}", "error") return redirect(url_for("index")) @app.route("/drive/cdrom", methods=["POST"]) def drive_cdrom(): vendor = request.form.get("vendor") product = request.form.get("product") revision = request.form.get("revision") block_size = request.form.get("block_size") file_name = request.form.get("file_name") # Creating the drive properties file from pathlib import Path file_name = str(Path(file_name).stem) + "." + PROPERTIES_SUFFIX properties = {"vendor": vendor, "product": product, "revision": revision, "block_size": block_size} process = write_drive_properties(file_name, properties) if process["status"] == True: flash(f"Drive properties file {file_name} created") return redirect(url_for("index")) else: flash(f"Failed to create drive properties file {file_name}", "error") return redirect(url_for("index")) @app.route("/config/save", methods=["POST"]) def config_save(): file_name = request.form.get("name") or "default" file_name = f"{file_name}.json" process = write_config(file_name) if process["status"] == True: flash(f"Saved config to {file_name}!") flash(process["msg"]) return redirect(url_for("index")) else: flash(f"Failed to saved config to {file_name}!", "error") flash(process['msg'], "error") return redirect(url_for("index")) @app.route("/config/load", methods=["POST"]) def config_load(): file_name = request.form.get("name") if "load" in request.form: process = read_config(file_name) if process["status"] == True: flash(f"Loaded config from {file_name}!") flash(process["msg"]) return redirect(url_for("index")) else: flash(f"Failed to load {file_name}!", "error") flash(process['msg'], "error") return redirect(url_for("index")) elif "delete" in request.form: process = delete_file(file_name) if process["status"] == True: flash(f"Deleted config {file_name}!") flash(process["msg"]) return redirect(url_for("index")) else: flash(f"Failed to delete {file_name}!", "error") flash(process['msg'], "error") return redirect(url_for("index")) @app.route("/logs/show", methods=["POST"]) def show_logs(): lines = request.form.get("lines") or "200" scope = request.form.get("scope") or "default" from subprocess import run if scope != "default": process = run(["journalctl", "-n", lines, "-u", scope], capture_output=True) else: process = run(["journalctl", "-n", lines], capture_output=True) if process.returncode == 0: headers = {"content-type": "text/plain"} return process.stdout.decode("utf-8"), int(lines), headers else: flash("Failed to get logs") flash(process.stdout.decode("utf-8"), "stdout") flash(process.stderr.decode("utf-8"), "stderr") return redirect(url_for("index")) @app.route("/logs/level", methods=["POST"]) def log_level(): level = request.form.get("level") or "info" process = set_log_level(level) if process["status"] == True: flash(f"Log level set to {level}!") return redirect(url_for("index")) else: flash(f"Failed to set log level to {level}!", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/daynaport/attach", methods=["POST"]) def daynaport_attach(): scsi_id = request.form.get("scsi_id") interface = request.form.get("if") ip = request.form.get("ip") mask = request.form.get("mask") kwargs = {"device_type": "SCDP"} if interface != "": arg = interface if "" not in (ip, mask): arg += (":" + ip + "/" + mask) kwargs["interfaces"] = arg validate = validate_scsi_id(scsi_id) if validate["status"] == False: flash(validate["msg"], "error") return redirect(url_for("index")) process = attach_image(scsi_id, **kwargs) if process["status"] == True: flash(f"Attached DaynaPORT to SCSI id {scsi_id}!") return redirect(url_for("index")) else: flash(f"Failed to attach DaynaPORT to SCSI id {scsi_id}!", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/scsi/attach", methods=["POST"]) def attach(): file_name = request.form.get("file_name") file_size = request.form.get("file_size") scsi_id = request.form.get("scsi_id") device_type = request.form.get("type") validate = validate_scsi_id(scsi_id) if validate["status"] == False: flash(validate["msg"], "error") return redirect(url_for("index")) kwargs = {"image": file_name} if device_type != "": kwargs["device_type"] = device_type # Attempt to load the device properties file: # same base path but PROPERTIES_SUFFIX instead of the original suffix. from pathlib import Path file_name_base = str(Path(file_name).stem) drive_properties = Path(base_dir + file_name_base + "." + PROPERTIES_SUFFIX) if drive_properties.is_file(): process = read_drive_properties(str(drive_properties)) if process["status"] == False: flash(f"Failed to load the device properties file {file_name_base}.{PROPERTIES_SUFFIX}", "error") flash(process["msg"], "error") return redirect(url_for("index")) conf = process["conf"] kwargs["vendor"] = conf["vendor"] kwargs["product"] = conf["product"] kwargs["revision"] = conf["revision"] kwargs["block_size"] = conf["block_size"] process = attach_image(scsi_id, **kwargs) if process["status"] == True: flash(f"Attached {file_name} to SCSI id {scsi_id}!") return redirect(url_for("index")) else: flash(f"Failed to attach {file_name} to SCSI id {scsi_id}!", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/scsi/detach_all", methods=["POST"]) def detach_all_devices(): process = detach_all() if process["status"] == True: flash("Detached all SCSI devices!") return redirect(url_for("index")) else: flash("Failed to detach all SCSI devices!", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/scsi/detach", methods=["POST"]) def detach(): scsi_id = request.form.get("scsi_id") process = detach_by_id(scsi_id) if process["status"] == True: flash(f"Detached SCSI id {scsi_id}!") return redirect(url_for("index")) else: flash(f"Failed to detach SCSI id {scsi_id}!", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/scsi/eject", methods=["POST"]) def eject(): scsi_id = request.form.get("scsi_id") process = eject_by_id(scsi_id) if process["status"] == True: flash(f"Ejected scsi id {scsi_id}!") return redirect(url_for("index")) else: flash(f"Failed to eject SCSI id {scsi_id}!", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/scsi/info", methods=["POST"]) def device_info(): scsi_id = request.form.get("scsi_id") devices = list_devices(scsi_id) # First check if any device at all was returned if devices["status"] == False: flash(f"No device attached to SCSI id {scsi_id}!", "error") return redirect(url_for("index")) # Looking at the first dict in list to get # the one and only device that should have been returned device = devices["device_list"][0] if str(device["id"]) == scsi_id: flash("=== DEVICE INFO ===") flash(f"SCSI ID: {device['id']}") flash(f"Unit: {device['un']}") flash(f"Type: {device['device_type']}") flash(f"Status: {device['status']}") flash(f"File: {device['image']}") flash(f"Parameters: {device['params']}") flash(f"Vendor: {device['vendor']}") flash(f"Product: {device['product']}") flash(f"Revision: {device['revision']}") flash(f"Block Size: {device['block_size']}") return redirect(url_for("index")) else: flash(f"Failed to get device info for SCSI id {scsi_id}!", "error") return redirect(url_for("index")) @app.route("/pi/reboot", methods=["POST"]) def restart(): flash("Restarting the Pi momentarily...") reboot_pi() return redirect(url_for("index")) @app.route("/rascsi/restart", methods=["POST"]) def rascsi_restart(): rascsi_service("restart") flash("Restarting RaSCSI Service...") return redirect(url_for("index")) @app.route("/pi/shutdown", methods=["POST"]) def shutdown(): flash("Shutting down the Pi momentarily...") shutdown_pi() return redirect(url_for("index")) @app.route("/files/download_to_iso", methods=["POST"]) def download_file(): scsi_id = request.form.get("scsi_id") validate = validate_scsi_id(scsi_id) if validate["status"] == False: flash(validate["msg"], "error") return redirect(url_for("index")) url = request.form.get("url") process = download_file_to_iso(scsi_id, url) if process["status"] == True: flash(f"File Downloaded and Attached to SCSI id {scsi_id}") flash(process["msg"]) return redirect(url_for("index")) else: flash(f"Failed to download and attach file {url}", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/files/download_image", methods=["POST"]) def download_img(): url = request.form.get("url") process = download_image(url) if process["status"] == True: flash(f"File Downloaded from {url}") return redirect(url_for("index")) else: flash(f"Failed to download file {url}", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/files/upload", methods=["POST"]) def upload_file(): from werkzeug.utils import secure_filename from os import path import pydrop log = logging.getLogger("pydrop") file = request.files["file"] filename = secure_filename(file.filename) save_path = path.join(app.config["UPLOAD_FOLDER"], filename) current_chunk = int(request.form['dzchunkindex']) # Makes sure not to overwrite an existing file, # but continues writing to a file transfer in progress if path.exists(save_path) and current_chunk == 0: return make_response((f"The file {file.filename} already exists!", 400)) try: with open(save_path, "ab") as f: f.seek(int(request.form["dzchunkbyteoffset"])) f.write(file.stream.read()) except OSError: log.exception("Could not write to file") return make_response(("Unable to write the file to disk!", 500)) total_chunks = int(request.form["dztotalchunkcount"]) if current_chunk + 1 == total_chunks: # Validate the resulting file size after writing the last chunk if path.getsize(save_path) != int(request.form["dztotalfilesize"]): log.error(f"Finished transferring {file.filename}, " f"but it has a size mismatch with the original file." f"Got {path.getsize(save_path)} but we " f"expected {request.form['dztotalfilesize']}.") return make_response(("Transferred file corrupted!", 500)) else: log.info(f"File {file.filename} has been uploaded successfully") if filename.lower().endswith(".zip"): unzip_file(filename) else: log.debug(f"Chunk {current_chunk + 1} of {total_chunks} " f"for file {file.filename} completed.") return make_response(("File upload successful!", 200)) @app.route("/files/create", methods=["POST"]) def create_file(): file_name = request.form.get("file_name") size = (int(request.form.get("size")) * 1024 * 1024) file_type = request.form.get("type") from werkzeug.utils import secure_filename file_name = secure_filename(file_name) process = create_new_image(file_name, file_type, size) if process["status"] == True: flash(f"Drive image created as {file_name}.{file_type}") flash(process["msg"]) return redirect(url_for("index")) else: flash(f"Failed to create file {file_name}.{file_type}", "error") flash(process["msg"], "error") return redirect(url_for("index")) @app.route("/files/download", methods=["POST"]) def download(): image = request.form.get("image") return send_file(base_dir + image, as_attachment=True) @app.route("/files/delete", methods=["POST"]) def delete(): file_name = request.form.get("image") process = delete_file(file_name) if process["status"] == True: flash(f"File {file_name} deleted!") flash(process["msg"]) else: flash(f"Failed to delete file {file_name}!", "error") flash(process["msg"], "error") return redirect(url_for("index")) # Delete the drive properties file, if it exists from pathlib import Path file_name = str(Path(file_name).stem) + "." + PROPERTIES_SUFFIX file_path = Path(base_dir + file_name) if file_path.is_file(): process = delete_file(file_name) if process["status"] == True: flash(f"File {file_name} deleted!") flash(process["msg"]) return redirect(url_for("index")) else: flash(f"Failed to delete file {file_name}!", "error") flash(process["msg"], "error") return redirect(url_for("index")) return redirect(url_for("index")) if __name__ == "__main__": app.secret_key = "rascsi_is_awesome_insecure_secret_key" app.config["SESSION_TYPE"] = "filesystem" app.config["UPLOAD_FOLDER"] = base_dir from os import makedirs makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) app.config["MAX_CONTENT_LENGTH"] = MAX_FILE_SIZE # Load the default configuration file, if found from pathlib import Path default_config_path = Path(base_dir + DEFAULT_CONFIG) if default_config_path.is_file(): read_config(DEFAULT_CONFIG) import bjoern print("Serving rascsi-web...") bjoern.run(app, "0.0.0.0", 8080)