Refactor python code to address Sonarcloud issues (#900)

- copy/move/delete file class methods now take Path objects as arguments
- The file download endpoint in the Web UI uses the safer download from dir method
- Simplified logging
- Merged nested if statements
- Removed naked handling of unknown error states
- Added fallback empty list for drive_properties, to avoid errors when json file is missing or broken
- Move drive_properties to env[]
- Constants for common error messages
- Dummy variable for list comprehension
This commit is contained in:
Daniel Markstedt 2022-10-09 13:50:20 -07:00 committed by GitHub
parent ca23d9b7a3
commit 1b10b123d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 105 additions and 154 deletions

View File

@ -23,4 +23,7 @@ ARCHIVE_FILE_SUFFIXES = [
# The RESERVATIONS list is used to keep track of the reserved ID memos.
# Initialize with a list of 8 empty strings.
RESERVATIONS = ["" for x in range(0, 8)]
RESERVATIONS = ["" for _ in range(0, 8)]
# Standard error message for shell commands
SHELL_ERROR = "Shell command: \"%s\" led to error: %s"

View File

@ -6,7 +6,7 @@ import os
import logging
import asyncio
from functools import lru_cache
from pathlib import PurePath
from pathlib import PurePath, Path
from zipfile import ZipFile, is_zipfile
from time import time
from subprocess import run, CalledProcessError
@ -16,12 +16,21 @@ from shutil import copyfile
import requests
import rascsi_interface_pb2 as proto
from rascsi.common_settings import CFG_DIR, CONFIG_FILE_SUFFIX, PROPERTIES_SUFFIX, ARCHIVE_FILE_SUFFIXES, RESERVATIONS
from rascsi.common_settings import (
CFG_DIR,
CONFIG_FILE_SUFFIX,
PROPERTIES_SUFFIX,
ARCHIVE_FILE_SUFFIXES,
RESERVATIONS,
SHELL_ERROR,
)
from rascsi.ractl_cmds import RaCtlCmds
from rascsi.return_codes import ReturnCodes
from rascsi.socket_cmds import SocketCmds
from util import unarchiver
FILE_READ_ERROR = "Unhandled exception when reading file: %s"
FILE_WRITE_ERROR = "Unhandled exception when writing to file: %s"
class FileCmds:
"""
@ -148,7 +157,7 @@ class FileCmds:
command.params["token"] = self.token
command.params["locale"] = self.locale
command.params["file"] = file_name + "." + file_type
command.params["file"] = f"{file_name}.{file_type}"
command.params["size"] = str(size)
command.params["read_only"] = "false"
@ -216,14 +225,15 @@ class FileCmds:
# noinspection PyMethodMayBeStatic
def delete_file(self, file_path):
"""
Takes (str) file_path with the full path to the file to delete
Takes (Path) file_path for the file to delete
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"file_path": file_path
}
if os.path.exists(file_path):
os.remove(file_path)
if file_path.exists():
file_path.unlink()
return {
"status": True,
"return_code": ReturnCodes.DELETEFILE_SUCCESS,
@ -238,14 +248,16 @@ class FileCmds:
# noinspection PyMethodMayBeStatic
def rename_file(self, file_path, target_path):
"""
Takes (str) file_path and (str) target_path
Takes:
- (Path) file_path for the file to rename
- (Path) target_path for the name to rename
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"target_path": target_path
}
if os.path.exists(PurePath(target_path).parent):
os.rename(file_path, target_path)
if target_path.parent.exists:
file_path.rename(target_path)
return {
"status": True,
"return_code": ReturnCodes.RENAMEFILE_SUCCESS,
@ -260,14 +272,16 @@ class FileCmds:
# noinspection PyMethodMayBeStatic
def copy_file(self, file_path, target_path):
"""
Takes (str) file_path and (str) target_path
Takes:
- (Path) file_path for the file to copy from
- (Path) target_path for the name to copy to
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"target_path": target_path
}
if os.path.exists(PurePath(target_path).parent):
copyfile(file_path, target_path)
if target_path.parent.exists:
copyfile(str(file_path), str(target_path))
return {
"status": True,
"return_code": ReturnCodes.WRITEFILE_SUCCESS,
@ -305,21 +319,22 @@ class FileCmds:
properties_files_moved = []
if move_properties_files_to_config:
for file in extract_result["extracted"]:
if file.get("name").endswith(".properties"):
if file.get("name").endswith(f".{PROPERTIES_SUFFIX}"):
prop_path = Path(CFG_DIR) / file["name"]
if (self.rename_file(
file["absolute_path"],
f"{CFG_DIR}/{file['name']}"
Path(file["absolute_path"]),
prop_path,
)):
properties_files_moved.append({
"status": True,
"name": file["path"],
"path": f"{CFG_DIR}/{file['name']}",
"path": str(prop_path),
})
else:
properties_files_moved.append({
"status": False,
"name": file["path"],
"path": f"{CFG_DIR}/{file['name']}",
"path": str(prop_path),
})
return {
@ -386,7 +401,7 @@ class FileCmds:
"%s was successfully unzipped. Deleting the zipfile.",
tmp_full_path,
)
self.delete_file(tmp_full_path)
self.delete_file(Path(tmp_full_path))
try:
run(
@ -401,8 +416,7 @@ class FileCmds:
check=True,
)
except CalledProcessError as error:
logging.warning("Executed shell command: %s", " ".join(error.cmd))
logging.warning("Got error: %s", error.stderr.decode("utf-8"))
logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8"))
return {"status": False, "msg": error.stderr.decode("utf-8")}
parameters = {
@ -452,9 +466,9 @@ class FileCmds:
Takes (str) file_name
Returns (dict) with (bool) status and (str) msg
"""
file_name = f"{CFG_DIR}/{file_name}"
file_path = f"{CFG_DIR}/{file_name}"
try:
with open(file_name, "w", encoding="ISO-8859-1") as json_file:
with open(file_path, "w", encoding="ISO-8859-1") as json_file:
version = self.ractl.get_server_info()["version"]
devices = self.ractl.list_devices()["device_list"]
for device in devices:
@ -485,7 +499,7 @@ class FileCmds:
indent=4
)
parameters = {
"target_path": file_name
"target_path": file_path
}
return {
"status": True,
@ -494,19 +508,12 @@ class FileCmds:
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
self.delete_file(file_name)
self.delete_file(Path(file_path))
return {"status": False, "msg": str(error)}
except:
logging.error("Could not write to file: %s", file_name)
self.delete_file(file_name)
parameters = {
"file_name": file_name
}
return {
"status": False,
"return_code": ReturnCodes.WRITEFILE_COULD_NOT_WRITE,
"parameters": parameters,
}
logging.error(FILE_WRITE_ERROR, file_name)
self.delete_file(Path(file_path))
raise
def read_config(self, file_name):
"""
@ -577,15 +584,8 @@ class FileCmds:
logging.error(str(error))
return {"status": False, "msg": str(error)}
except:
logging.error("Could not read file: %s", file_name)
parameters = {
"file_name": file_name
}
return {
"status": False,
"return_code": ReturnCodes.READCONFIG_COULD_NOT_READ,
"parameters": parameters
}
logging.error(FILE_READ_ERROR, file_name)
raise
def write_drive_properties(self, file_name, conf):
"""
@ -607,19 +607,12 @@ class FileCmds:
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
self.delete_file(file_path)
self.delete_file(Path(file_path))
return {"status": False, "msg": str(error)}
except:
logging.error("Could not write to file: %s", file_path)
self.delete_file(file_path)
parameters = {
"target_path": file_path
}
return {
"status": False,
"return_code": ReturnCodes.WRITEFILE_COULD_NOT_WRITE,
"parameters": parameters,
}
logging.error(FILE_WRITE_ERROR, file_path)
self.delete_file(Path(file_path))
raise
# noinspection PyMethodMayBeStatic
def read_drive_properties(self, file_path):
@ -644,15 +637,8 @@ class FileCmds:
logging.error(str(error))
return {"status": False, "msg": str(error)}
except:
logging.error("Could not read file: %s", file_path)
parameters = {
"file_path": file_path
}
return {
"status": False,
"return_codes": ReturnCodes.READDRIVEPROPS_COULD_NOT_READ,
"parameters": parameters,
}
logging.error(FILE_READ_ERROR, file_path)
raise
# noinspection PyMethodMayBeStatic
async def run_async(self, program, args):

View File

@ -224,16 +224,13 @@ class RaCtlCmds:
devices = proto.PbDeviceDefinition()
devices.id = int(scsi_id)
if "device_type" in kwargs.keys():
if kwargs["device_type"]:
devices.type = proto.PbDeviceType.Value(str(kwargs["device_type"]))
if "unit" in kwargs.keys():
if kwargs["unit"]:
devices.unit = kwargs["unit"]
if "params" in kwargs.keys():
if isinstance(kwargs["params"], dict):
for param in kwargs["params"]:
devices.params[param] = kwargs["params"][param]
if kwargs.get("device_type"):
devices.type = proto.PbDeviceType.Value(str(kwargs["device_type"]))
if kwargs.get("unit"):
devices.unit = kwargs["unit"]
if kwargs.get("params") and isinstance(kwargs["params"], dict):
for param in kwargs["params"]:
devices.params[param] = kwargs["params"][param]
# Handling the inserting of media into an attached removable type device
device_type = kwargs.get("device_type", None)
@ -260,18 +257,14 @@ class RaCtlCmds:
# Handling attaching a new device
else:
command.operation = proto.PbOperation.ATTACH
if "vendor" in kwargs.keys():
if kwargs["vendor"]:
devices.vendor = kwargs["vendor"]
if "product" in kwargs.keys():
if kwargs["product"]:
devices.product = kwargs["product"]
if "revision" in kwargs.keys():
if kwargs["revision"]:
devices.revision = kwargs["revision"]
if "block_size" in kwargs.keys():
if kwargs["block_size"]:
devices.block_size = int(kwargs["block_size"])
if kwargs.get("vendor"):
devices.vendor = kwargs["vendor"]
if kwargs.get("product"):
devices.product = kwargs["product"]
if kwargs.get("revision"):
devices.revision = kwargs["revision"]
if kwargs.get("block_size"):
devices.block_size = int(kwargs["block_size"])
command.devices.append(devices)

View File

@ -8,6 +8,7 @@ from shutil import disk_usage
from re import findall, match
from socket import socket, gethostname, AF_INET, SOCK_DGRAM
from rascsi.common_settings import SHELL_ERROR
class SysCmds:
"""
@ -32,8 +33,7 @@ class SysCmds:
.strip()
)
except subprocess.CalledProcessError as error:
logging.warning("Executed shell command: %s", " ".join(error.cmd))
logging.warning("Got error: %s", error.stderr.decode("utf-8"))
logging.warning(SHELL_ERROR, error.cmd, error.stderr.decode("utf-8"))
ra_git_version = ""
try:
@ -47,9 +47,8 @@ class SysCmds:
.strip()
)
except subprocess.CalledProcessError as error:
logging.warning("Executed shell command: %s", " ".join(error.cmd))
logging.warning("Got error: %s", error.stderr.decode("utf-8"))
pi_version = "Unknown"
logging.warning(SHELL_ERROR, " ".join(error.cmd), error.stderr.decode("utf-8"))
pi_version = "?"
return {"git": ra_git_version, "env": pi_version}
@ -70,8 +69,7 @@ class SysCmds:
.strip()
)
except subprocess.CalledProcessError as error:
logging.warning("Executed shell command: %s", " ".join(error.cmd))
logging.warning("Got error: %s", error.stderr.decode("utf-8"))
logging.warning(SHELL_ERROR, error.cmd, error.stderr.decode("utf-8"))
processes = ""
matching_processes = findall(daemon, processes)
@ -93,8 +91,7 @@ class SysCmds:
.strip()
)
except subprocess.CalledProcessError as error:
logging.warning("Executed shell command: %s", " ".join(error.cmd))
logging.warning("Got error: %s", error.stderr.decode("utf-8"))
logging.warning(SHELL_ERROR, error.cmd, error.stderr.decode("utf-8"))
bridges = ""
if "rascsi_bridge" in bridges:

View File

@ -13,7 +13,7 @@
<th scope="col">{{ _("Description") }}</th>
<th scope="col">{{ _("Action") }}</th>
</tr>
{% for hd in drive_properties['hd_conf']|sort(attribute='name') %}
{% for hd in env['drive_properties']['hd_conf']|sort(attribute='name') %}
<tr>
<td align="center">
{% if hd.url != "" %}
@ -48,7 +48,7 @@
<th scope="col">{{ _("Description") }}</th>
<th scope="col">{{ _("Action") }}</th>
</tr>
{% for cd in drive_properties['cd_conf']|sort(attribute='name') %}
{% for cd in env['drive_properties']['cd_conf']|sort(attribute='name') %}
<tr>
<td align="center">
{% if cd.url != "" %}
@ -88,7 +88,7 @@
<th scope="col">{{ _("Description") }}</th>
<th scope="col">{{ _("Action") }}</th>
</tr>
{% for rm in drive_properties['rm_conf']|sort(attribute='name') %}
{% for rm in env['drive_properties']['rm_conf']|sort(attribute='name') %}
<tr>
<td align="center">
{% if rm.url != "" %}

View File

@ -256,7 +256,7 @@
{% endif %}
<td align="center">
<form action="/files/download" method="post">
<input name="file" type="hidden" value="{{ env["image_dir"] }}/{{ file['name'] }}">
<input name="file" type="hidden" value="{{ file['name'] }}">
<input type="submit" value="{{ file['size_mb'] }} {{ _("MiB") }} &#8595;">
</form>
</td>
@ -392,21 +392,21 @@
{{ _("None") }}
</option>
{% if type == "SCCD" %}
{% for drive in drive_properties["cd_conf"] | sort(attribute='name') %}
{% for drive in env["drive_properties"]["cd_conf"] | sort(attribute='name') %}
<option value="{{ drive.name }}">
{{ drive.name }}
</option>
{% endfor %}
{% endif %}
{% if type == "SCRM" %}
{% for drive in drive_properties["rm_conf"] | sort(attribute='name') %}
{% for drive in env["drive_properties"]["rm_conf"] | sort(attribute='name') %}
<option value="{{ drive.name }}">
{{ drive.name }}
</option>
{% endfor %}
{% endif %}
{% if type == "SCMO" %}
{% for drive in drive_properties["mo_conf"] | sort(attribute='name') %}
{% for drive in env["drive_properties"]["mo_conf"] | sort(attribute='name') %}
<option value="{{ drive.name }}">
{{ drive.name }}
</option>
@ -581,7 +581,7 @@
<option value="">
{{ _("None") }}
</option>
{% for drive in drive_properties["hd_conf"] | sort(attribute='name') %}
{% for drive in env["drive_properties"]["hd_conf"] | sort(attribute='name') %}
<option value="{{ drive.name }}">
{{ drive.name }}
</option>

View File

@ -22,7 +22,6 @@ from flask import (
flash,
url_for,
redirect,
send_file,
send_from_directory,
make_response,
session,
@ -98,6 +97,7 @@ def get_env_info():
"cd_suffixes": tuple(server_info["sccd"]),
"rm_suffixes": tuple(server_info["scrm"]),
"mo_suffixes": tuple(server_info["scmo"]),
"drive_properties": format_drive_properties(APP.config["RASCSI_DRIVE_PROPERTIES"]),
}
@ -229,16 +229,6 @@ def index():
server_info["sccd"]
)
try:
drive_properties = format_drive_properties(APP.config["RASCSI_DRIVE_PROPERTIES"])
except:
drive_properties = {
"hd_conf": [],
"cd_conf": [],
"rm_conf": [],
"mo_conf": [],
}
return response(
template="index.html",
locales=get_supported_locales(),
@ -257,7 +247,6 @@ def index():
reserved_scsi_ids=reserved_scsi_ids,
image_suffixes_to_create=image_suffixes_to_create,
valid_image_suffixes=valid_image_suffixes,
drive_properties=drive_properties,
max_file_size=int(int(MAX_FILE_SIZE) / 1024 / 1024),
RESERVATIONS=RESERVATIONS,
CFG_DIR=CFG_DIR,
@ -284,20 +273,10 @@ def drive_list():
"""
Sets up the data structures and kicks off the rendering of the drive list page
"""
try:
drive_properties = format_drive_properties(APP.config["RASCSI_DRIVE_PROPERTIES"])
except:
drive_properties = {
"hd_conf": [],
"cd_conf": [],
"rm_conf": [],
"mo_conf": [],
}
return response(
template="drives.html",
files=file_cmd.list_images()["files"],
drive_properties=drive_properties,
)
@ -310,10 +289,9 @@ def login():
password = request.form["password"]
groups = [g.gr_name for g in getgrall() if username in g.gr_mem]
if AUTH_GROUP in groups:
if authenticate(str(username), str(password)):
session["username"] = request.form["username"]
return response(env=get_env_info())
if AUTH_GROUP in groups and authenticate(str(username), str(password)):
session["username"] = request.form["username"]
return response(env=get_env_info())
return response(error=True, status_code=401, message=_(
"You must log in with valid credentials for a user in the '%(group)s' group",
@ -442,7 +420,8 @@ def config_load():
return response(error=True, message=process["msg"])
if "delete" in request.form:
process = file_cmd.delete_file(f"{CFG_DIR}/{file_name}")
file_path = Path(CFG_DIR) / file_name
process = file_cmd.delete_file(file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
return response(message=process["msg"])
@ -954,8 +933,9 @@ def download():
"""
Downloads a file from the Pi to the local computer
"""
image = request.form.get("file")
return send_file(image, as_attachment=True)
file_name = request.form.get("file")
server_info = ractl_cmd.get_server_info()
return send_from_directory(server_info["image_dir"], file_name, as_attachment=True)
@APP.route("/files/delete", methods=["POST"])
@ -974,8 +954,8 @@ def delete():
(_("Image file deleted: %(file_name)s", file_name=file_name), "success")]
# Delete the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
if Path(prop_file_path).is_file():
prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}"
if prop_file_path.is_file():
process = file_cmd.delete_file(prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
@ -1003,9 +983,9 @@ def rename():
(_("Image file renamed to: %(file_name)s", file_name=new_file_name), "success")]
# Rename the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
new_prop_file_path = f"{CFG_DIR}/{new_file_name}.{PROPERTIES_SUFFIX}"
if Path(prop_file_path).is_file():
prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}"
new_prop_file_path = Path(CFG_DIR) / f"{new_file_name}.{PROPERTIES_SUFFIX}"
if prop_file_path.is_file():
process = file_cmd.rename_file(prop_file_path, new_prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
@ -1033,9 +1013,9 @@ def copy():
(_("Copy of image file saved as: %(file_name)s", file_name=new_file_name), "success")]
# Create a copy of the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
new_prop_file_path = f"{CFG_DIR}/{new_file_name}.{PROPERTIES_SUFFIX}"
if Path(prop_file_path).is_file():
prop_file_path = Path(CFG_DIR) / f"{file_name}.{PROPERTIES_SUFFIX}"
new_prop_file_path = Path(CFG_DIR) / f"{new_file_name}.{PROPERTIES_SUFFIX}"
if prop_file_path.is_file():
process = file_cmd.copy_file(prop_file_path, new_prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
@ -1171,8 +1151,10 @@ if __name__ == "__main__":
if process["status"]:
APP.config["RASCSI_DRIVE_PROPERTIES"] = process["conf"]
else:
APP.config["RASCSI_DRIVE_PROPERTIES"] = []
logging.error(process["msg"])
else:
APP.config["RASCSI_DRIVE_PROPERTIES"] = []
logging.warning("Could not read drive properties from %s", DRIVE_PROPERTIES_FILE)
logging.basicConfig(stream=sys.stdout,

View File

@ -294,18 +294,7 @@ def upload_with_dropzonejs(image_dir):
if current_chunk + 1 == total_chunks:
# Validate the resulting file size after writing the last chunk
if path.getsize(save_path) != int(request.form["dztotalfilesize"]):
log.error(
"Finished transferring %s, "
"but it has a size mismatch with the original file. "
"Got %s but we expected %s.",
file_object.filename,
path.getsize(save_path),
request.form['dztotalfilesize'],
)
log.error("File size mismatch between the original file and transferred file.")
return make_response(_("Transferred file corrupted!"), 500)
log.info("File %s has been uploaded successfully", file_object.filename)
log.debug("Chunk %s of %s for file %s completed.",
current_chunk + 1, total_chunks, file_object.filename)
return make_response(_("File upload successful!"), 200)

View File

@ -3,6 +3,7 @@ from conftest import STATUS_SUCCESS, STATUS_ERROR
# route("/login", methods=["POST"])
def test_login_with_valid_credentials(pytestconfig, http_client_unauthenticated):
# Note: This test depends on the rascsi group existing and 'username' a member the group
response = http_client_unauthenticated.post(
"/login",
data={

View File

@ -201,7 +201,7 @@ def test_upload_file(http_client, delete_file):
def test_download_file(http_client, create_test_image):
file_name = create_test_image()
response = http_client.post("/files/download", data={"file": f"{IMAGES_DIR}/{file_name}"})
response = http_client.post("/files/download", data={"file": file_name})
assert response.status_code == 200
assert response.headers["content-type"] == "application/octet-stream"

View File

@ -42,7 +42,7 @@ def test_show_named_drive_presets(http_client):
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert "drive_properties" in response_data["data"]
assert "files" in response_data["data"]
# route("/drive/cdrom", methods=["POST"])