From 943a9ab5374f6db785d4fab4e2f982387547e6a7 Mon Sep 17 00:00:00 2001 From: Eric Helgeson Date: Mon, 1 Feb 2021 11:39:50 -0600 Subject: [PATCH] Black code formatting --- src/web/create_disk.py | 28 +++--- src/web/file_cmds.py | 30 ++++-- src/web/pi_cmds.py | 17 +++- src/web/ractl_cmds.py | 68 +++++++++---- src/web/settings.py | 4 +- src/web/web.py | 212 +++++++++++++++++++++++------------------ 6 files changed, 215 insertions(+), 144 deletions(-) diff --git a/src/web/create_disk.py b/src/web/create_disk.py index 2958c69d..6bc521ff 100644 --- a/src/web/create_disk.py +++ b/src/web/create_disk.py @@ -7,39 +7,39 @@ from settings import * def make_cd(file_path, file_type, file_creator): with open(file_path, "rb") as f: file_bytes = f.read() - file_name = file_path.split('/')[-1] - file_suffix = file_name.split('.')[-1] + file_name = file_path.split("/")[-1] + file_suffix = file_name.split(".")[-1] if file_type is None and file_creator is None: - if file_suffix.lower() == 'sea': - file_type = '.sea' - file_creator = 'APPL' + if file_suffix.lower() == "sea": + file_type = ".sea" + file_creator = "APPL" v = Volume() v.name = "TestName" - v['Folder'] = Folder() + v["Folder"] = Folder() - v['Folder'][file_name] = File() - v['Folder'][file_name].data = file_bytes - v['Folder'][file_name].rsrc = b'' + v["Folder"][file_name] = File() + v["Folder"][file_name].data = file_bytes + v["Folder"][file_name].rsrc = b"" if not (file_type is None and file_creator is None): - v['Folder'][file_name].type = bytearray(file_type) - v['Folder'][file_name].creator = bytearray(file_creator) + v["Folder"][file_name].type = bytearray(file_type) + v["Folder"][file_name].creator = bytearray(file_creator) padding = (len(file_bytes) % 512) + (512 * 50) print("mod", str(len(file_bytes) % 512)) print("padding " + str(padding)) print("len " + str(len(file_bytes))) print("total " + str(len(file_bytes) + padding)) - with open(base_dir + 'test.hda', 'wb') as f: + with open(base_dir + "test.hda", "wb") as f: flat = v.write( size=len(file_bytes) + padding, align=512, # Allocation block alignment modulus (2048 for CDs) desktopdb=True, # Create a dummy Desktop Database to prevent a rebuild on boot bootable=False, # This requires a folder with a ZSYS and a FNDR file - startapp=('Folder', file_name), # Path (as tuple) to an app to open at boot + startapp=("Folder", file_name), # Path (as tuple) to an app to open at boot ) f.write(flat) - return base_dir + 'test.hda' + return base_dir + "test.hda" diff --git a/src/web/file_cmds.py b/src/web/file_cmds.py index 415735a1..58512ed0 100644 --- a/src/web/file_cmds.py +++ b/src/web/file_cmds.py @@ -5,8 +5,9 @@ import time from ractl_cmds import attach_image from settings import * -valid_file_types = ['*.hda', '*.iso', '*.cdr'] -valid_file_types = r'|'.join([fnmatch.translate(x) for x in valid_file_types]) + +valid_file_types = ["*.hda", "*.iso", "*.cdr"] +valid_file_types = r"|".join([fnmatch.translate(x) for x in valid_file_types]) def create_new_image(file_name, type, size): @@ -15,8 +16,10 @@ def create_new_image(file_name, type, size): else: file_name = file_name + "." + type - return subprocess.run(["dd", "if=/dev/zero", "of=" + base_dir + file_name, "bs=1M", "count=" + size], - capture_output=True) + return subprocess.run( + ["dd", "if=/dev/zero", "of=" + base_dir + file_name, "bs=1M", "count=" + size], + capture_output=True, + ) def delete_image(file_name): @@ -30,18 +33,24 @@ def delete_image(file_name): def unzip_file(file_name): import zipfile - with zipfile.ZipFile(base_dir + file_name, 'r') as zip_ref: + + with zipfile.ZipFile(base_dir + file_name, "r") as zip_ref: zip_ref.extractall(base_dir) return True + def rascsi_service(action): # start/stop/restart - return subprocess.run(["sudo", "/bin/systemctl", action, "rascsi.service"]).returncode == 0 + return ( + subprocess.run(["sudo", "/bin/systemctl", action, "rascsi.service"]).returncode + == 0 + ) def download_file_to_iso(scsi_id, url): import urllib.request - file_name = url.split('/')[-1] + + file_name = url.split("/")[-1] tmp_ts = int(time.time()) tmp_dir = "/tmp/" + str(tmp_ts) + "/" os.mkdir(tmp_dir) @@ -50,7 +59,9 @@ def download_file_to_iso(scsi_id, url): urllib.request.urlretrieve(url, tmp_full_path) # iso_filename = make_cd(tmp_full_path, None, None) # not working yet - iso_proc = subprocess.run(["genisoimage", "-hfs", "-o", iso_filename, tmp_full_path], capture_output=True) + iso_proc = subprocess.run( + ["genisoimage", "-hfs", "-o", iso_filename, tmp_full_path], capture_output=True + ) if iso_proc.returncode != 0: return iso_proc return attach_image(scsi_id, iso_filename, "cd") @@ -58,7 +69,8 @@ def download_file_to_iso(scsi_id, url): def download_image(url): import urllib.request - file_name = url.split('/')[-1] + + file_name = url.split("/")[-1] full_path = base_dir + file_name urllib.request.urlretrieve(url, full_path) diff --git a/src/web/pi_cmds.py b/src/web/pi_cmds.py index 26f13086..17f716cc 100644 --- a/src/web/pi_cmds.py +++ b/src/web/pi_cmds.py @@ -3,7 +3,10 @@ import subprocess def rascsi_service(action): # start/stop/restart - return subprocess.run(["sudo", "/bin/systemctl", action, "rascsi.service"]).returncode == 0 + return ( + subprocess.run(["sudo", "/bin/systemctl", action, "rascsi.service"]).returncode + == 0 + ) def reboot_pi(): @@ -15,6 +18,14 @@ def shutdown_pi(): def running_version(): - ra_web_version = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True).stdout.decode("utf-8").strip() - pi_version = subprocess.run(["uname", "-a"], capture_output=True).stdout.decode("utf-8").strip() + ra_web_version = ( + subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True) + .stdout.decode("utf-8") + .strip() + ) + pi_version = ( + subprocess.run(["uname", "-a"], capture_output=True) + .stdout.decode("utf-8") + .strip() + ) return ra_web_version + " " + pi_version diff --git a/src/web/ractl_cmds.py b/src/web/ractl_cmds.py index 38614983..60205903 100644 --- a/src/web/ractl_cmds.py +++ b/src/web/ractl_cmds.py @@ -4,8 +4,9 @@ import subprocess import re from settings import * -valid_file_types = ['*.hda', '*.iso', '*.cdr', '*.zip'] -valid_file_types = r'|'.join([fnmatch.translate(x) for x in valid_file_types]) + +valid_file_types = ["*.hda", "*.iso", "*.cdr", "*.zip"] +valid_file_types = r"|".join([fnmatch.translate(x) for x in valid_file_types]) # List of SCSI ID's you'd like to exclude - eg if you are on a Mac, the System is usually 7 EXCLUDE_SCSI_IDS = [7] @@ -20,19 +21,27 @@ def list_files(): for path, dirs, files in os.walk(base_dir): # Only list valid file types files = [f for f in files if re.match(valid_file_types, f)] - files_list.extend([ - (os.path.join(path, file), - # TODO: move formatting to template - '{:,.0f}'.format(os.path.getsize(os.path.join(path, file)) / float(1 << 20)) + " MB") - for file in files]) + files_list.extend( + [ + ( + os.path.join(path, file), + # TODO: move formatting to template + "{:,.0f}".format( + os.path.getsize(os.path.join(path, file)) / float(1 << 20) + ) + + " MB", + ) + for file in files + ] + ) return files_list def get_valid_scsi_ids(devices): invalid_list = EXCLUDE_SCSI_IDS.copy() for device in devices: - if device['file'] != "NO MEDIA" and device['file'] != "-": - invalid_list.append(int(device['id'])) + if device["file"] != "NO MEDIA" and device["file"] != "-": + invalid_list.append(int(device["id"])) valid_list = list(range(8)) for id in invalid_list: @@ -50,7 +59,10 @@ def attach_image(scsi_id, image, type): if type == "cd" and get_type(scsi_id) == "SCCD": return insert(scsi_id, image) else: - return subprocess.run(["rasctl", "-c", "attach", "-t", type, "-i", scsi_id, "-f", image], capture_output=True) + return subprocess.run( + ["rasctl", "-c", "attach", "-t", type, "-i", scsi_id, "-f", image], + capture_output=True, + ) def detach_by_id(scsi_id): @@ -58,7 +70,9 @@ def detach_by_id(scsi_id): def disconnect_by_id(scsi_id): - return subprocess.run(["rasctl", "-c", "disconnect", "-i", scsi_id], capture_output=True) + return subprocess.run( + ["rasctl", "-c", "disconnect", "-i", scsi_id], capture_output=True + ) def eject_by_id(scsi_id): @@ -66,33 +80,45 @@ def eject_by_id(scsi_id): def insert(scsi_id, image): - return subprocess.run(["rasctl", "-i", scsi_id, "-c", "insert", "-f", image], capture_output=True) + return subprocess.run( + ["rasctl", "-i", scsi_id, "-c", "insert", "-f", image], capture_output=True + ) def rascsi_service(action): # start/stop/restart - return subprocess.run(["sudo", "/bin/systemctl", action, "rascsi.service"]).returncode == 0 + return ( + subprocess.run(["sudo", "/bin/systemctl", action, "rascsi.service"]).returncode + == 0 + ) def list_devices(): device_list = [] for id in range(8): device_list.append({"id": str(id), "un": "-", "type": "-", "file": "-"}) - output = subprocess.run(["rasctl", "-l"], capture_output=True).stdout.decode("utf-8") + output = subprocess.run(["rasctl", "-l"], capture_output=True).stdout.decode( + "utf-8" + ) for line in output.splitlines(): # Valid line to process, continue - if not line.startswith("+") and \ - not line.startswith("| ID |") and \ - (not line.startswith("No device is installed.") or line.startswith("No images currently attached.")) \ - and len(line) > 0: + if ( + not line.startswith("+") + and not line.startswith("| ID |") + and ( + not line.startswith("No device is installed.") + or line.startswith("No images currently attached.") + ) + and len(line) > 0 + ): line.rstrip() device = {} segments = line.split("|") if len(segments) > 4: idx = int(segments[1].strip()) device_list[idx]["id"] = str(idx) - device_list[idx]['un'] = segments[2].strip() - device_list[idx]['type'] = segments[3].strip() - device_list[idx]['file'] = segments[4].strip() + device_list[idx]["un"] = segments[2].strip() + device_list[idx]["type"] = segments[3].strip() + device_list[idx]["file"] = segments[4].strip() return device_list diff --git a/src/web/settings.py b/src/web/settings.py index 24acee41..85036168 100644 --- a/src/web/settings.py +++ b/src/web/settings.py @@ -1,4 +1,4 @@ import os -base_dir = os.getenv('BASE_DIR', "/home/pi/images/") -MAX_FILE_SIZE = os.getenv('MAX_FILE_SIZE', 1024 * 1024 * 1024 * 2) # 2gb +base_dir = os.getenv("BASE_DIR", "/home/pi/images/") +MAX_FILE_SIZE = os.getenv("MAX_FILE_SIZE", 1024 * 1024 * 1024 * 2) # 2gb diff --git a/src/web/web.py b/src/web/web.py index f2965a7a..0fe3a20f 100644 --- a/src/web/web.py +++ b/src/web/web.py @@ -3,204 +3,226 @@ import os from flask import Flask, render_template, request, flash, url_for, redirect, send_file from werkzeug.utils import secure_filename -from file_cmds import create_new_image, download_file_to_iso, delete_image, unzip_file, download_image +from file_cmds import ( + create_new_image, + download_file_to_iso, + delete_image, + unzip_file, + download_image, +) from pi_cmds import shutdown_pi, reboot_pi, running_version, rascsi_service -from ractl_cmds import attach_image, list_devices, is_active, list_files, detach_by_id, eject_by_id, get_valid_scsi_ids +from ractl_cmds import ( + attach_image, + list_devices, + is_active, + list_files, + detach_by_id, + eject_by_id, + get_valid_scsi_ids, +) from settings import * app = Flask(__name__) -@app.route('/') + +@app.route("/") def index(): devices = list_devices() scsi_ids = get_valid_scsi_ids(devices) - return render_template('index.html', - devices=devices, - active=is_active(), - files=list_files(), - base_dir=base_dir, - scsi_ids=scsi_ids, - max_file_size=MAX_FILE_SIZE, - version=running_version()) + return render_template( + "index.html", + devices=devices, + active=is_active(), + files=list_files(), + base_dir=base_dir, + scsi_ids=scsi_ids, + max_file_size=MAX_FILE_SIZE, + version=running_version(), + ) -@app.route('/logs') + +@app.route("/logs") def logs(): import subprocess - lines = request.args.get('lines') or "100" + + lines = request.args.get("lines") or "100" process = subprocess.run(["journalctl", "-n", lines], capture_output=True) if process.returncode == 0: - headers = { 'content-type':'text/plain' } + headers = {"content-type": "text/plain"} return process.stdout.decode("utf-8"), 200, headers else: - flash(u'Failed to get logs') - flash(process.stdout.decode("utf-8"), 'stdout') - flash(process.stderr.decode("utf-8"), 'stderr') - return redirect(url_for('index')) + flash(u"Failed to get logs") + flash(process.stdout.decode("utf-8"), "stdout") + flash(process.stderr.decode("utf-8"), "stderr") + return redirect(url_for("index")) -@app.route('/scsi/attach', methods=['POST']) +@app.route("/scsi/attach", methods=["POST"]) def attach(): - file_name = request.form.get('file_name') - scsi_id = request.form.get('scsi_id') + file_name = request.form.get("file_name") + scsi_id = request.form.get("scsi_id") # Validate image type by suffix - if file_name.lower().endswith('.iso') or file_name.lower().endswith('iso'): + if file_name.lower().endswith(".iso") or file_name.lower().endswith("iso"): image_type = "cd" - elif file_name.lower().endswith('.hda'): + elif file_name.lower().endswith(".hda"): image_type = "hd" else: - flash(u'Unknown file type. Valid files are .iso, .hda, .cdr', 'error') - return redirect(url_for('index')) + flash(u"Unknown file type. Valid files are .iso, .hda, .cdr", "error") + return redirect(url_for("index")) process = attach_image(scsi_id, file_name, image_type) if process.returncode == 0: - flash('Attached '+ file_name + " to scsi id " + scsi_id + "!") - return redirect(url_for('index')) + flash("Attached " + file_name + " to scsi id " + scsi_id + "!") + return redirect(url_for("index")) else: - flash(u'Failed to attach '+ file_name + " to scsi id " + scsi_id + "!", 'error') - flash(process.stdout.decode("utf-8"), 'stdout') - flash(process.stderr.decode("utf-8"), 'stderr') - return redirect(url_for('index')) + flash( + u"Failed to attach " + file_name + " to scsi id " + scsi_id + "!", "error" + ) + flash(process.stdout.decode("utf-8"), "stdout") + flash(process.stderr.decode("utf-8"), "stderr") + return redirect(url_for("index")) -@app.route('/scsi/detach', methods=['POST']) +@app.route("/scsi/detach", methods=["POST"]) def detach(): - scsi_id = request.form.get('scsi_id') + scsi_id = request.form.get("scsi_id") process = detach_by_id(scsi_id) if process.returncode == 0: flash("Detached scsi id " + scsi_id + "!") - return redirect(url_for('index')) + return redirect(url_for("index")) else: - flash(u"Failed to detach scsi id " + scsi_id + "!", 'error') - flash(process.stdout, 'stdout') - flash(process.stderr, 'stderr') - return redirect(url_for('index')) + flash(u"Failed to detach scsi id " + scsi_id + "!", "error") + flash(process.stdout, "stdout") + flash(process.stderr, "stderr") + return redirect(url_for("index")) -@app.route('/scsi/eject', methods=['POST']) +@app.route("/scsi/eject", methods=["POST"]) def eject(): - scsi_id = request.form.get('scsi_id') + scsi_id = request.form.get("scsi_id") process = eject_by_id(scsi_id) if process.returncode == 0: flash("Ejected scsi id " + scsi_id + "!") - return redirect(url_for('index')) + return redirect(url_for("index")) else: - flash(u"Failed to eject scsi id " + scsi_id + "!", 'error') - flash(process.stdout, 'stdout') - flash(process.stderr, 'stderr') - return redirect(url_for('index')) + flash(u"Failed to eject scsi id " + scsi_id + "!", "error") + flash(process.stdout, "stdout") + flash(process.stderr, "stderr") + return redirect(url_for("index")) -@app.route('/pi/reboot', methods=['POST']) +@app.route("/pi/reboot", methods=["POST"]) def restart(): reboot_pi() flash("Restarting...") - return redirect(url_for('index')) + return redirect(url_for("index")) -@app.route('/rascsi/restart', methods=['POST']) +@app.route("/rascsi/restart", methods=["POST"]) def rascsi_restart(): rascsi_service("restart") flash("Restarting RaSCSI Service...") - return redirect(url_for('index')) + return redirect(url_for("index")) -@app.route('/pi/shutdown', methods=['POST']) +@app.route("/pi/shutdown", methods=["POST"]) def shutdown(): shutdown_pi() flash("Shutting down...") - return redirect(url_for('index')) + return redirect(url_for("index")) -@app.route('/files/download_to_iso', methods=['POST']) +@app.route("/files/download_to_iso", methods=["POST"]) def download_file(): - scsi_id = request.form.get('scsi_id') - url = request.form.get('url') + scsi_id = request.form.get("scsi_id") + url = request.form.get("url") process = download_file_to_iso(scsi_id, url) if process.returncode == 0: flash("File Downloaded") - return redirect(url_for('index')) + return redirect(url_for("index")) else: - flash(u"Failed to download file", 'error') - flash(process.stdout, 'stdout') - flash(process.stderr, 'stderr') - return redirect(url_for('index')) + flash(u"Failed to download file", "error") + flash(process.stdout, "stdout") + flash(process.stderr, "stderr") + return redirect(url_for("index")) -@app.route('/files/download_image', methods=['POST']) +@app.route("/files/download_image", methods=["POST"]) def download_img(): - url = request.form.get('url') + url = request.form.get("url") # TODO: error handling download_image(url) flash("File Downloaded") - return redirect(url_for('index')) + return redirect(url_for("index")) -@app.route('/files/upload', methods=['POST']) +@app.route("/files/upload", methods=["POST"]) def upload_file(): - if 'file' not in request.files: - flash('No file part', 'error') - return redirect(url_for('index')) - file = request.files['file'] + if "file" not in request.files: + flash("No file part", "error") + return redirect(url_for("index")) + file = request.files["file"] if file: filename = secure_filename(file.filename) - file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) - return redirect(url_for('index', filename=filename)) + file.save(os.path.join(app.config["UPLOAD_FOLDER"], filename)) + return redirect(url_for("index", filename=filename)) -@app.route('/files/create', methods=['POST']) +@app.route("/files/create", methods=["POST"]) def create_file(): - file_name = request.form.get('file_name') - size = request.form.get('size') - type = request.form.get('type') + file_name = request.form.get("file_name") + size = request.form.get("size") + type = request.form.get("type") process = create_new_image(file_name, type, size) if process.returncode == 0: flash("Drive created") - return redirect(url_for('index')) + return redirect(url_for("index")) else: - flash(u"Failed to create file", 'error') - flash(process.stdout, 'stdout') - flash(process.stderr, 'stderr') - return redirect(url_for('index')) + flash(u"Failed to create file", "error") + flash(process.stdout, "stdout") + flash(process.stderr, "stderr") + return redirect(url_for("index")) -@app.route('/files/download', methods=['POST']) +@app.route("/files/download", methods=["POST"]) def download(): - image = request.form.get('image') + image = request.form.get("image") return send_file(base_dir + image, as_attachment=True) -@app.route('/files/delete', methods=['POST']) +@app.route("/files/delete", methods=["POST"]) def delete(): - image = request.form.get('image') + image = request.form.get("image") if delete_image(image): flash("File " + image + " deleted") - return redirect(url_for('index')) + return redirect(url_for("index")) else: - flash(u"Failed to Delete " + image, 'error') - return redirect(url_for('index')) + flash(u"Failed to Delete " + image, "error") + return redirect(url_for("index")) -@app.route('/files/unzip', methods=['POST']) +@app.route("/files/unzip", methods=["POST"]) def unzip(): - image = request.form.get('image') + image = request.form.get("image") if unzip_file(image): flash("Unzipped file " + image) - return redirect(url_for('index')) + return redirect(url_for("index")) else: - flash(u"Failed to unzip " + image, 'error') - return redirect(url_for('index')) + flash(u"Failed to unzip " + image, "error") + return redirect(url_for("index")) if __name__ == "__main__": - app.secret_key = 'rascsi_is_awesome_insecure_secret_key' - app.config['SESSION_TYPE'] = 'filesystem' - app.config['UPLOAD_FOLDER'] = base_dir - os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) - app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE + app.secret_key = "rascsi_is_awesome_insecure_secret_key" + app.config["SESSION_TYPE"] = "filesystem" + app.config["UPLOAD_FOLDER"] = base_dir + os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) + app.config["MAX_CONTENT_LENGTH"] = MAX_FILE_SIZE from waitress import serve + serve(app, host="0.0.0.0", port=8080)