Merge pull request #846 from nucleogenic/webui-json-responses

JSON API and test suite for web UI
This commit is contained in:
nucleogenic 2022-09-26 00:18:06 +01:00 committed by GitHub
commit ed1285327a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1373 additions and 276 deletions

1
.gitignore vendored
View File

@ -11,6 +11,7 @@ src/raspberrypi/hfdisk/
*~
messages.pot
messages.mo
report.xml
docker/docker-compose.override.yml
/docker/volumes/images/*

View File

@ -6,18 +6,29 @@ FROM "${OS_ARCH}/${OS_DISTRO}:${OS_VERSION}"
EXPOSE 80 443
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends sudo rsyslog procps
RUN apt-get update && apt-get install -y --no-install-recommends sudo systemd rsyslog procps
RUN groupadd pi
RUN useradd --create-home --shell /bin/bash -g pi pi
RUN echo "pi ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
RUN echo "pi:rascsi" | chpasswd
WORKDIR /home/pi
RUN mkdir /home/pi/afpshare
RUN touch /etc/dhcpcd.conf
RUN mkdir -p /etc/network/interfaces.d/
WORKDIR /home/pi/RASCSI
USER pi
COPY --chown=pi:pi . RASCSI
RUN cd RASCSI && ./easyinstall.sh --run_choice=11 --skip-token
COPY --chown=pi:pi . .
# Standalone RaSCSI web UI
RUN ./easyinstall.sh --run_choice=11 --skip-token
# Wired network bridge
RUN ./easyinstall.sh --run_choice=6 --headless
USER root
WORKDIR /home/pi
RUN pip3 install watchdog
COPY docker/rascsi-web/start.sh /usr/local/bin/start.sh
RUN chmod +x /usr/local/bin/start.sh

View File

@ -6,15 +6,15 @@ FROM "${OS_ARCH}/${OS_DISTRO}:${OS_VERSION}"
EXPOSE 6868
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends sudo rsyslog patch
RUN apt-get update && apt-get install -y --no-install-recommends sudo systemd rsyslog patch
RUN groupadd pi
RUN useradd --create-home --shell /bin/bash -g pi pi
RUN echo "pi ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
USER pi
COPY --chown=pi:pi . /home/pi/RASCSI
WORKDIR /home/pi/RASCSI
USER pi
COPY --chown=pi:pi . .
# Workaround for Bullseye amd64 compilation error
# https://github.com/akuker/RASCSI/issues/821
@ -24,6 +24,7 @@ RUN patch -p0 < docker/rascsi/cfilesystem.patch
RUN ./easyinstall.sh --run_choice=10 --cores=`nproc` --skip-token
USER root
WORKDIR /home/pi
COPY docker/rascsi/rascsi_wrapper.sh /usr/local/bin/rascsi_wrapper.sh
RUN chmod +x /usr/local/bin/rascsi_wrapper.sh
CMD ["/usr/local/bin/rascsi_wrapper.sh", "-L", "trace", "-r", "7", "-F", "/home/pi/images"]

View File

@ -691,6 +691,8 @@ function setupWiredNetworking() {
echo "WARNING: If you continue, the IP address of your Pi may change upon reboot."
echo "Please make sure you will not lose access to the Pi system."
echo ""
if [[ -z $HEADLESS ]]; then
echo "Do you want to proceed with network configuration using the default settings? [Y/n]"
read REPLY
@ -701,13 +703,19 @@ function setupWiredNetworking() {
read SELECTED
LAN_INTERFACE=$SELECTED
fi
fi
if [ "$(grep -c "^denyinterfaces" /etc/dhcpcd.conf)" -ge 1 ]; then
echo "WARNING: Network forwarding may already have been configured. Proceeding will overwrite the configuration."
if [[ -z $HEADLESS ]]; then
echo "Press enter to continue or CTRL-C to exit"
read REPLY
fi
sudo sed -i /^denyinterfaces/d /etc/dhcpcd.conf
fi
sudo bash -c 'echo "denyinterfaces '$LAN_INTERFACE'" >> /etc/dhcpcd.conf'
echo "Modified /etc/dhcpcd.conf"
@ -720,6 +728,12 @@ function setupWiredNetworking() {
echo "Either use the Web UI, or do this on the command line (assuming SCSI ID 6):"
echo "rasctl -i 6 -c attach -t scdp -f $LAN_INTERFACE"
echo ""
if [[ $HEADLESS ]]; then
echo "Skipping reboot in headless mode"
return 0
fi
echo "We need to reboot your Pi"
echo "Press Enter to reboot or CTRL-C to exit"
read
@ -1269,6 +1283,7 @@ function runChoice() {
preparePythonCommon
cachePipPackages
installRaScsiWebInterface
enableWebInterfaceAuth
echo "Configuring RaSCSI Web Interface stand-alone - Complete!"
echo "Launch the Web Interface with the 'start.sh' script. To use a custom port for the web server: 'start.sh --web-port=8081"
;;
@ -1367,6 +1382,9 @@ while [ "$1" != "" ]; do
-s | --skip-token)
SKIP_TOKEN=1
;;
-h | --headless)
HEADLESS=1
;;
*)
echo "ERROR: Unknown parameter \"$PARAM\""
exit 1

View File

@ -521,6 +521,8 @@ class FileCmds:
# introduce more sophisticated format detection logic here.
if isinstance(config, dict):
self.ractl.detach_all()
for scsi_id in range(0, 8):
RESERVATIONS[scsi_id] = ""
ids_to_reserve = []
for item in config["reserved_ids"]:
ids_to_reserve.append(item["id"])

View File

@ -40,7 +40,7 @@ class RaCtlCmds:
version = (str(result.server_info.version_info.major_version) + "." +
str(result.server_info.version_info.minor_version) + "." +
str(result.server_info.version_info.patch_version))
log_levels = result.server_info.log_level_info.log_levels
log_levels = list(result.server_info.log_level_info.log_levels)
current_log_level = result.server_info.log_level_info.current_log_level
reserved_ids = list(result.server_info.reserved_ids_info.ids)
image_dir = result.server_info.image_files_info.default_image_folder
@ -113,7 +113,7 @@ class RaCtlCmds:
result = proto.PbResult()
result.ParseFromString(data)
ifs = result.network_interfaces_info.name
return {"status": result.status, "ifs": ifs}
return {"status": result.status, "ifs": list(ifs)}
def get_device_types(self):
"""
@ -140,7 +140,7 @@ class RaCtlCmds:
"removable": device.properties.removable,
"supports_file": device.properties.supports_file,
"params": params,
"block_sizes": device.properties.block_sizes,
"block_sizes": list(device.properties.block_sizes),
}
return {"status": result.status, "device_types": device_types}
@ -394,7 +394,7 @@ class RaCtlCmds:
dpath = result.devices_info.devices[i].file.name
dfile = dpath.replace(image_files_info["images_dir"] + "/", "")
dparam = result.devices_info.devices[i].params
dparam = dict(result.devices_info.devices[i].params)
dven = result.devices_info.devices[i].vendor
dprod = result.devices_info.devices[i].product
drev = result.devices_info.devices[i].revision

2
python/web/.flake8 Normal file
View File

@ -0,0 +1,2 @@
[flake8]
max-line-length = 100

View File

@ -0,0 +1,8 @@
[tool.pytest.ini_options]
addopts = "--junitxml=report.xml"
log_cli = true
log_cli_level = "warn"
[tool.black]
line-length = 100
target-version = ['py37', 'py38', 'py39']

View File

@ -0,0 +1,4 @@
pytest==7.1.3
pytest-httpserver==1.0.6
black==22.8.0
flake8==5.0.4

View File

@ -31,18 +31,33 @@ table, tr, td {
margin: none;
}
.error {
color: white;
font-size:20px;
background-color:red;
white-space: pre-line;
div.flash {
margin-top: 5px;
margin-bottom: 5px;
}
.message {
div.flash div {
color: white;
font-size:20px;
background-color:green;
font-size: 18px;
white-space: pre-line;
padding: 2px 5px;
}
div.flash div.success {
background-color: green;
}
div.flash div.warning {
background-color: orange;
color: black;
}
div.flash div.error {
background-color: red;
}
div.flash div.info {
background-color: #0d6efd;
}
td.inactive {

View File

@ -1,7 +1,7 @@
<!doctype html>
<html>
<head>
<title>{{ _("RaSCSI Reloaded Control Page") }} [{{ host }}]</title>
<title>{{ _("RaSCSI Reloaded Control Page") }} [{{ env["host"] }}]</title>
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1" />
<link rel="apple-touch-icon" sizes="57x57" href="/pwa/apple-icon-57x57.png">
@ -26,12 +26,12 @@
<script type="application/javascript">
var processNotify = function(Notification) {
document.getElementById("flash").innerHTML = "<div class='message'>" + Notification + "{{ _(" This process may take a while, and will continue in the background if you navigate away from this page.") }}</div>";
document.getElementById("flash").innerHTML = "<div class=\"info\">" + Notification + "{{ _(" This process may take a while, and will continue in the background if you navigate away from this page.") }}</div>";
window.scrollTo(0,0);
}
var shutdownNotify = function(Notification) {
document.getElementById("flash").innerHTML = "<div class='message'>" + Notification + "{{ _(" The Web Interface will become unresponsive momentarily. Reload this page after the Pi has started up again.") }}</div>";
document.getElementById("flash").innerHTML = "<div class=\"info\">" + Notification + "{{ _(" The Web Interface will become unresponsive momentarily. Reload this page after the Pi has started up again.") }}</div>";
window.scrollTo(0,0);
}
</script>
@ -43,9 +43,9 @@
<body>
<div class="content">
<div class="header">
{% if auth_active %}
{% if username %}
<span style="display: inline-block; width: 100%; color: white; background-color: green; text-align: center; vertical-align: center; font-family: Arial, Helvetica, sans-serif;">{{ _("Logged in as <em>%(username)s</em>", username=username) }} &#8211; <a href="/logout">{{ _("Log Out") }}</a></span>
{% if env["auth_active"] %}
{% if env["username"] %}
<span style="display: inline-block; width: 100%; color: white; background-color: green; text-align: center; vertical-align: center; font-family: Arial, Helvetica, sans-serif;">{{ _("Logged in as <em>%(username)s</em>", username=env["username"]) }} &#8211; <a href="/logout">{{ _("Log Out") }}</a></span>
{% else %}
<span style="display: inline-block; width: 100%; color: white; background-color: red; text-align: center; vertical-align: center; font-family: Arial, Helvetica, sans-serif;">
<form method="POST" action="/login">
@ -70,7 +70,7 @@
</tr>
<tr>
<td style="color: white;">
hostname: {{ host }} ip: {{ ip_addr }}
hostname: {{ env["host"] }} ip: {{ env["ip_addr"] }}
</td>
</tr>
</tbody>
@ -89,8 +89,8 @@
{% block content %}{% endblock content %}
</div>
<div class="footer">
<center><tt>{{ _("RaSCSI Reloaded version: ") }}<strong>{{ version }} <a href="https://github.com/akuker/RASCSI/commit/{{ running_env['git'] }}" target="_blank">{{ running_env["git"][:7] }}</a></strong></tt></center>
<center><tt>{{ _("Pi environment: ") }}{{ running_env["env"] }}</tt></center>
<center><tt>{{ _("RaSCSI Reloaded version: ") }}<strong>{{ version }} <a href="https://github.com/akuker/RASCSI/commit/{{ env["running_env"]["git"] }}" target="_blank">{{ env["running_env"]["git"][:7] }}</a></strong></tt></center>
<center><tt>{{ _("Pi environment: ") }}{{ env["running_env"]["env"] }}</tt></center>
</div>
</div>
</body>

View File

@ -135,7 +135,7 @@
{% endfor %}
</tbody>
</table>
<p><small>{{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=free_disk) }}</small></p>
<p><small>{{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=env["free_disk_space"]) }}</small></p>
<p><a href="/">{{ _("Cancel") }}</a></p>
{% endblock content %}

View File

@ -156,8 +156,16 @@
<li>{{ _("Select a valid SCSI ID and <a href=\"%(url)s\">LUN</a> to attach to. Unless you know what you're doing, always use LUN 0.", url="https://en.wikipedia.org/wiki/Logical_unit_number") }}
</li>
<li>{{ _("If RaSCSI was unable to detect the media type associated with the image, you get to choose the type from the dropdown.") }}</li>
<li>{{ _("Recognized image file types: %(valid_image_suffixes)s", valid_image_suffixes=valid_image_suffixes) }}</li>
<li>{{ _("Recognized archive file types: %(ARCHIVE_FILE_SUFFIXES)s", ARCHIVE_FILE_SUFFIXES=ARCHIVE_FILE_SUFFIXES) }}</li>
<li>
{{ _("Recognized image file types:") }}
{% set comma = joiner(", ") %}
{% for extension in valid_image_suffixes %}{{ comma() }}.{{ extension}}{% endfor %}
</li>
<li>
{{ _("Recognized archive file types:") }}
{% set comma = joiner(", ") %}
{% for extension in ARCHIVE_FILE_SUFFIXES %}{{ comma() }}.{{ extension}}{% endfor %}
</li>
</ul>
</details>
@ -301,7 +309,7 @@
{% endfor %}
</tbody>
</table>
<p><small>{{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=free_disk) }}</small></p>
<p><small>{{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=env["free_disk_space"]) }}</small></p>
<hr/>
<details>

View File

@ -27,6 +27,7 @@ from flask import (
make_response,
session,
abort,
jsonify,
)
from rascsi.ractl_cmds import RaCtlCmds
@ -67,6 +68,66 @@ from settings import (
APP = Flask(__name__)
BABEL = Babel(APP)
def get_env_info():
"""
Get information about the app/host environment
"""
ip_addr, host = sys_cmd.get_ip_and_host()
if "username" in session:
username = session["username"]
else:
username = None
return {
"running_env": sys_cmd.running_env(),
"username": username,
"auth_active": auth_active(AUTH_GROUP)["status"],
"ip_addr": ip_addr,
"host": host,
"free_disk_space": int(sys_cmd.disk_space()["free"] / 1024 / 1024),
}
def response(
template=None,
message=None,
redirect_url=None,
error=False,
status_code=200,
**kwargs
):
"""
Generates a HTML or JSON HTTP response
"""
status = "error" if error else "success"
if isinstance(message, list):
messages = message
elif message is None:
messages = []
else:
messages = [(str(message), status)]
if request.headers.get("accept") == "application/json":
return jsonify({
"status": status,
"messages": [{"message": m, "category": c} for m, c in messages],
"data": kwargs
}), status_code
if messages:
for message, category in messages:
flash(message, category)
if template:
kwargs["env"] = get_env_info()
return render_template(template, **kwargs)
if redirect_url:
return redirect(url_for(redirect_url))
return redirect(url_for("index"))
@BABEL.localeselector
def get_locale():
@ -87,12 +148,14 @@ def get_locale():
def get_supported_locales():
"""
Returns a list of Locale objects that the Web Interfaces supports
Returns a list of languages supported by the web UI
"""
locales = BABEL.list_translations()
locales.append(Locale("en"))
sorted_locales = sorted(locales, key=lambda x: x.language)
return sorted_locales
locales = [
{"language": x.language, "display_name": x.display_name}
for x in [*BABEL.list_translations(), Locale("en")]
]
return sorted(locales, key=lambda x: x["language"])
# pylint: disable=too-many-locals
@ -147,7 +210,7 @@ def index():
server_info["scmo"]
)
valid_image_suffixes = "." + ", .".join(
valid_image_suffixes = (
server_info["schd"] +
server_info["scrm"] +
server_info["scmo"] +
@ -159,14 +222,12 @@ def index():
else:
username = None
return render_template(
"index.html",
return response(
template="index.html",
locales=get_supported_locales(),
bridge_configured=sys_cmd.is_bridge_setup(),
netatalk_configured=sys_cmd.running_proc("afpd"),
macproxy_configured=sys_cmd.running_proc("macproxy"),
ip_addr=ip_addr,
host=host,
devices=formatted_devices,
files=extended_image_files,
config_files=config_files,
@ -181,28 +242,32 @@ def index():
reserved_scsi_ids=reserved_scsi_ids,
RESERVATIONS=RESERVATIONS,
max_file_size=int(int(MAX_FILE_SIZE) / 1024 / 1024),
running_env=sys_cmd.running_env(),
version=server_info["version"],
log_levels=server_info["log_levels"],
current_log_level=server_info["current_log_level"],
netinfo=ractl_cmd.get_network_info(),
device_types=device_types,
free_disk=int(sys_cmd.disk_space()["free"] / 1024 / 1024),
image_suffixes_to_create=image_suffixes_to_create,
valid_image_suffixes=valid_image_suffixes,
cdrom_file_suffix=tuple(server_info["sccd"]),
removable_file_suffix=tuple(server_info["scrm"]),
mo_file_suffix=tuple(server_info["scmo"]),
username=username,
auth_active=auth_active(AUTH_GROUP)["status"],
PROPERTIES_SUFFIX=PROPERTIES_SUFFIX,
ARCHIVE_FILE_SUFFIXES="." + ", .".join(ARCHIVE_FILE_SUFFIXES),
ARCHIVE_FILE_SUFFIXES=ARCHIVE_FILE_SUFFIXES,
REMOVABLE_DEVICE_TYPES=ractl_cmd.get_removable_device_types(),
DISK_DEVICE_TYPES=ractl_cmd.get_disk_device_types(),
PERIPHERAL_DEVICE_TYPES=ractl_cmd.get_peripheral_device_types(),
)
@APP.route("/env")
def env():
"""
Shows information about the app/host environment
"""
return response(**get_env_info())
@APP.route("/drive/list", methods=["GET"])
def drive_list():
"""
@ -211,23 +276,20 @@ def drive_list():
# Reads the canonical drive properties into a dict
# The file resides in the current dir of the web ui process
drive_properties = Path(DRIVE_PROPERTIES_FILE)
if drive_properties.is_file():
if not drive_properties.is_file():
return response(
error=True,
message=_("Could not read drive properties from %(properties_file)s",
properties_file=drive_properties),
)
process = file_cmd.read_drive_properties(str(drive_properties))
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
flash(process["msg"], "error")
return redirect(url_for("index"))
conf = process["conf"]
else:
flash(
_(
"Could not read drive properties from %(properties_file)s",
properties_file=drive_properties,
),
"error",
)
return redirect(url_for("index"))
if not process["status"]:
return response(error=True, message=process["msg"])
conf = process["conf"]
hd_conf = []
cd_conf = []
rm_conf = []
@ -245,29 +307,17 @@ def drive_list():
device["size_mb"] = "{:,.2f}".format(device["size"] / 1024 / 1024)
rm_conf.append(device)
if "username" in session:
username = session["username"]
else:
username = None
server_info = ractl_cmd.get_server_info()
ip_addr, host = sys_cmd.get_ip_and_host()
return render_template(
return response(
"drives.html",
files=file_cmd.list_images()["files"],
base_dir=server_info["image_dir"],
hd_conf=hd_conf,
cd_conf=cd_conf,
rm_conf=rm_conf,
running_env=sys_cmd.running_env(),
version=server_info["version"],
free_disk=int(sys_cmd.disk_space()["free"] / 1024 / 1024),
cdrom_file_suffix=tuple(server_info["sccd"]),
username=username,
auth_active=auth_active(AUTH_GROUP)["status"],
ip_addr=ip_addr,
host=host,
)
@ -278,20 +328,17 @@ def login():
"""
username = request.form["username"]
password = request.form["password"]
groups = [g.gr_name for g in getgrall() if username in g.gr_mem]
if AUTH_GROUP in groups:
if authenticate(str(username), str(password)):
session["username"] = request.form["username"]
return redirect(url_for("index"))
flash(
_(
"You must log in with credentials for a user in the '%(group)s' group",
return response(env=get_env_info())
return response(error=True, status_code=401, message=_(
"You must log in with valid credentials for a user in the '%(group)s' group",
group=AUTH_GROUP,
),
"error",
)
return redirect(url_for("index"))
))
@APP.route("/logout")
@ -300,7 +347,7 @@ def logout():
Removes the logged in user from the session
"""
session.pop("username", None)
return redirect(url_for("index"))
return response()
@APP.route("/pwa/<path:pwa_path>")
@ -319,8 +366,7 @@ def login_required(func):
def decorated_function(*args, **kwargs):
auth = auth_active(AUTH_GROUP)
if auth["status"] and "username" not in session:
flash(auth["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=auth["msg"])
return func(*args, **kwargs)
return decorated_function
@ -342,11 +388,8 @@ def drive_create():
# Creating the image file
process = file_cmd.create_new_image(file_name, file_type, size)
if process["status"]:
flash(_("Image file created: %(file_name)s", file_name=full_file_name))
else:
flash(process["msg"], "error")
return redirect(url_for("index"))
if not process["status"]:
return response(error=True, message=process["msg"])
# Creating the drive properties file
prop_file_name = f"{file_name}.{file_type}.{PROPERTIES_SUFFIX}"
@ -358,12 +401,10 @@ def drive_create():
}
process = file_cmd.write_drive_properties(prop_file_name, properties)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
if not process["status"]:
return response(error=True, message=process["msg"])
flash(process['msg'], "error")
return redirect(url_for("index"))
return response(message=_("Image file created: %(file_name)s", file_name=full_file_name))
@APP.route("/drive/cdrom", methods=["POST"])
@ -389,11 +430,9 @@ def drive_cdrom():
process = file_cmd.write_drive_properties(file_name, properties)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
return response(message=process["msg"])
flash(process['msg'], "error")
return redirect(url_for("index"))
return response(error=True, message=process["msg"])
@APP.route("/config/save", methods=["POST"])
@ -408,11 +447,9 @@ def config_save():
process = file_cmd.write_config(file_name)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
return response(message=process["msg"])
flash(process['msg'], "error")
return redirect(url_for("index"))
return response(error=True, message=process["msg"])
@APP.route("/config/load", methods=["POST"])
@ -427,24 +464,19 @@ def config_load():
process = file_cmd.read_config(file_name)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
return response(message=process["msg"])
return response(error=True, message=process["msg"])
flash(process['msg'], "error")
return redirect(url_for("index"))
if "delete" in request.form:
process = file_cmd.delete_file(f"{CFG_DIR}/{file_name}")
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
return response(message=process["msg"])
flash(process['msg'], "error")
return redirect(url_for("index"))
return response(error=True, message=process["msg"])
# The only reason we would reach here would be a Web UI bug. Will not localize.
flash("Got an unhandled request (needs to be either load or delete)", "error")
return redirect(url_for("index"))
return response(error=True, message="Action field (load, delete) missing")
@APP.route("/logs/show", methods=["POST"])
@ -455,14 +487,16 @@ def show_logs():
lines = request.form.get("lines")
scope = request.form.get("scope")
# TODO: Render logs in a template (issue #836) and structured JSON
returncode, logs = sys_cmd.get_logs(lines, scope)
if returncode == 0:
headers = {"content-type": "text/plain"}
return logs, int(lines), headers
return logs, headers
flash(_("An error occurred when fetching logs."))
flash(logs, "stderr")
return redirect(url_for("index"))
return response(error=True, message=[
(_("An error occurred when fetching logs."), "error"),
(logs, "stderr"),
])
@APP.route("/logs/level", methods=["POST"])
@ -475,11 +509,9 @@ def log_level():
process = ractl_cmd.set_log_level(level)
if process["status"]:
flash(_("Log level set to %(value)s", value=level))
return redirect(url_for("index"))
return response(message=_("Log level set to %(value)s", value=level))
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=process["msg"])
@APP.route("/scsi/attach_device", methods=["POST"])
@ -505,11 +537,13 @@ def attach_device():
error_msg = _("Please follow the instructions at %(url)s", url=error_url)
if "interface" in params.keys():
# Note: is_bridge_configured returns False if the bridge is configured
bridge_status = is_bridge_configured(params["interface"])
if bridge_status:
flash(bridge_status, "error")
flash(error_msg, "error")
return redirect(url_for("index"))
return response(error=True, message=[
(bridge_status, "error"),
(error_msg, "error")
])
kwargs = {
"unit": int(unit),
@ -519,16 +553,14 @@ def attach_device():
process = ractl_cmd.attach_device(scsi_id, **kwargs)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(_(
return response(message=_(
"Attached %(device_type)s to SCSI ID %(id_number)s LUN %(unit_number)s",
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
))
return redirect(url_for("index"))
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=process["msg"])
@APP.route("/scsi/attach", methods=["POST"])
@ -557,8 +589,7 @@ def attach_image():
process = file_cmd.read_drive_properties(drive_properties)
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=process["msg"])
conf = process["conf"]
kwargs["vendor"] = conf["vendor"]
kwargs["product"] = conf["product"]
@ -569,28 +600,31 @@ def attach_image():
process = ractl_cmd.attach_device(scsi_id, **kwargs)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(_(
response_messages = [(_(
"Attached %(file_name)s as %(device_type)s to "
"SCSI ID %(id_number)s LUN %(unit_number)s",
file_name=file_name,
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
))
), "success")]
if int(file_size) % int(expected_block_size):
flash(_(
response_messages.append((_(
"The image file size %(file_size)s bytes is not a multiple of "
"%(block_size)s. RaSCSI will ignore the trailing data. "
"The image may be corrupted, so proceed with caution.",
file_size=file_size,
block_size=expected_block_size,
), "error")
return redirect(url_for("index"))
), "warning"))
flash(_("Failed to attach %(file_name)s to SCSI ID %(id_number)s LUN %(unit_number)s",
file_name=file_name, id_number=scsi_id, unit_number=unit), "error")
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(message=response_messages)
return response(error=True, message=[
(_("Failed to attach %(file_name)s to SCSI ID %(id_number)s LUN %(unit_number)s",
file_name=file_name, id_number=scsi_id, unit_number=unit), "error"),
(process["msg"], "error"),
])
@APP.route("/scsi/detach_all", methods=["POST"])
@ -601,11 +635,9 @@ def detach_all_devices():
"""
process = ractl_cmd.detach_all()
if process["status"]:
flash(_("Detached all SCSI devices"))
return redirect(url_for("index"))
return response(message=_("Detached all SCSI devices"))
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=process["msg"])
@APP.route("/scsi/detach", methods=["POST"])
@ -618,14 +650,14 @@ def detach():
unit = request.form.get("unit")
process = ractl_cmd.detach_by_id(scsi_id, unit)
if process["status"]:
flash(_("Detached SCSI ID %(id_number)s LUN %(unit_number)s",
return response(message=_("Detached SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit))
return redirect(url_for("index"))
flash(_("Failed to detach SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit), "error")
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=[
(_("Failed to detach SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit), "error"),
(process["msg"], "error"),
])
@APP.route("/scsi/eject", methods=["POST"])
@ -639,14 +671,15 @@ def eject():
process = ractl_cmd.eject_by_id(scsi_id, unit)
if process["status"]:
flash(_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s",
return response(message=_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit))
return redirect(url_for("index"))
flash(_("Failed to eject SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit), "error")
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=[
(_("Failed to eject SCSI ID %(id_number)s LUN %(unit_number)s",
id_number=scsi_id, unit_number=unit), "error"),
(process["msg"], "error"),
])
@APP.route("/scsi/info", methods=["POST"])
def device_info():
@ -660,29 +693,48 @@ def device_info():
# First check if any device at all was returned
if not devices["status"]:
flash(devices["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=devices["msg"])
# Looking at the first dict in list to get
# the one and only device that should have been returned
device = devices["device_list"][0]
if str(device["id"]) == scsi_id:
flash(_("DEVICE INFO"))
flash("===========")
flash(_("SCSI ID: %(id_number)s", id_number=device["id"]))
flash(_("LUN: %(unit_number)s", unit_number=device["unit"]))
flash(_("Type: %(device_type)s", device_type=device["device_type"]))
flash(_("Status: %(device_status)s", device_status=device["status"]))
flash(_("File: %(image_file)s", image_file=device["image"]))
flash(_("Parameters: %(value)s", value=device["params"]))
flash(_("Vendor: %(value)s", value=device["vendor"]))
flash(_("Product: %(value)s", value=device["product"]))
flash(_("Revision: %(revision_number)s", revision_number=device["revision"]))
flash(_("Block Size: %(value)s bytes", value=device["block_size"]))
flash(_("Image Size: %(value)s bytes", value=device["size"]))
return redirect(url_for("index"))
# TODO: Move the device info to the template instead of a flash message
message = "\n".join([
_("DEVICE INFO"),
"===========",
_("SCSI ID: %(id_number)s", id_number=device["id"]),
_("LUN: %(unit_number)s", unit_number=device["unit"]),
_("Type: %(device_type)s", device_type=device["device_type"]),
_("Status: %(device_status)s", device_status=device["status"]),
_("File: %(image_file)s", image_file=device["image"]),
_("Parameters: %(value)s", value=device["params"]),
_("Vendor: %(value)s", value=device["vendor"]),
_("Product: %(value)s", value=device["product"]),
_("Revision: %(revision_number)s", revision_number=device["revision"]),
_("Block Size: %(value)s bytes", value=device["block_size"]),
_("Image Size: %(value)s bytes", value=device["size"]),
])
# Don't send redundant "info" message with the JSON response
if request.headers.get("accept") == "application/json":
return response(device_info={
"scsi_id": device["id"],
"lun": device["unit"],
"device_type": device["device_type"],
"status": device["status"],
"file": device["image"],
"parameters": device["params"],
"vendor": device["vendor"],
"product": device["product"],
"revision": device["revision"],
"block_size": device["block_size"],
"size": device["size"],
})
return response(message=[(message, "info")])
return response(error=True, message=devices["msg"])
flash(devices["msg"], "error")
return redirect(url_for("index"))
@APP.route("/scsi/reserve", methods=["POST"])
@login_required
@ -697,12 +749,13 @@ def reserve_id():
process = ractl_cmd.reserve_scsi_ids(reserved_ids)
if process["status"]:
RESERVATIONS[int(scsi_id)] = memo
flash(_("Reserved SCSI ID %(id_number)s", id_number=scsi_id))
return redirect(url_for("index"))
return response(message=_("Reserved SCSI ID %(id_number)s", id_number=scsi_id))
return response(error=True, message=[
(_("Failed to reserve SCSI ID %(id_number)s", id_number=scsi_id), "error"),
(process["msg"], "error"),
])
flash(_("Failed to reserve SCSI ID %(id_number)s", id_number=scsi_id))
flash(process["msg"], "error")
return redirect(url_for("index"))
@APP.route("/scsi/release", methods=["POST"])
@login_required
@ -716,12 +769,12 @@ def release_id():
process = ractl_cmd.reserve_scsi_ids(reserved_ids)
if process["status"]:
RESERVATIONS[int(scsi_id)] = ""
flash(_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id))
return redirect(url_for("index"))
return response(message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id))
flash(_("Failed to release the reservation for SCSI ID %(id_number)s", id_number=scsi_id))
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=[
(_("Failed to release the reservation for SCSI ID %(id_number)s", id_number=scsi_id), "error"),
(process["msg"], "error"),
])
@APP.route("/pi/reboot", methods=["POST"])
@ -731,7 +784,7 @@ def restart():
Restarts the Pi
"""
ractl_cmd.shutdown_pi("reboot")
return redirect(url_for("index"))
return response()
@APP.route("/pi/shutdown", methods=["POST"])
@ -741,7 +794,7 @@ def shutdown():
Shuts down the Pi
"""
ractl_cmd.shutdown_pi("system")
return redirect(url_for("index"))
return response()
@APP.route("/files/download_to_iso", methods=["POST"])
@ -753,16 +806,18 @@ def download_to_iso():
scsi_id = request.form.get("scsi_id")
url = request.form.get("url")
iso_args = request.form.get("type").split()
response_messages = []
process = file_cmd.download_file_to_iso(url, *iso_args)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
flash(_("Saved image as: %(file_name)s", file_name=process['file_name']))
else:
flash(_("Failed to create CD-ROM image from %(url)s", url=url), "error")
flash(process["msg"], "error")
return redirect(url_for("index"))
if not process["status"]:
return response(error=True, message=[
(_("Failed to create CD-ROM image from %(url)s", url=url), "error"),
(process["msg"], "error"),
])
response_messages.append((process["msg"], "success"))
response_messages.append((_("Saved image as: %(file_name)s", file_name=process['file_name']), "success"))
process_attach = ractl_cmd.attach_device(
scsi_id,
@ -771,13 +826,13 @@ def download_to_iso():
)
process_attach = ReturnCodeMapper.add_msg(process_attach)
if process_attach["status"]:
flash(_("Attached to SCSI ID %(id_number)s", id_number=scsi_id))
return redirect(url_for("index"))
response_messages.append((_("Attached to SCSI ID %(id_number)s", id_number=scsi_id), "success"))
return response(message=response_messages)
flash(_("Failed to attach image to SCSI ID %(id_number)s. Try attaching it manually.",
id_number=scsi_id), "error")
flash(process_attach["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=[
(_("Failed to attach image to SCSI ID %(id_number)s. Try attaching it manually.", id_number=scsi_id), "error"),
(process_attach["msg"], "error"),
])
@APP.route("/files/download_to_images", methods=["POST"])
@ -791,12 +846,12 @@ def download_img():
process = file_cmd.download_to_dir(url, server_info["image_dir"], Path(url).name)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
return response(message=process["msg"])
flash(_("Failed to download file from %(url)s", url=url), "error")
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=[
(_("Failed to download file from %(url)s", url=url), "error"),
(process["msg"], "error"),
])
@APP.route("/files/download_to_afp", methods=["POST"])
@ -810,12 +865,12 @@ def download_afp():
process = file_cmd.download_to_dir(url, AFP_DIR, file_name)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
return response(message=process["msg"])
flash(_("Failed to download file from %(url)s", url=url), "error")
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=[
(_("Failed to download file from %(url)s", url=url), "error"),
(process["msg"], "error"),
])
@APP.route("/files/upload", methods=["POST"])
@ -846,11 +901,13 @@ def create_file():
process = file_cmd.create_new_image(file_name, file_type, size)
if process["status"]:
flash(_("Image file created: %(file_name)s", file_name=full_file_name))
return redirect(url_for("index"))
return response(
status_code=201,
message=_("Image file created: %(file_name)s", file_name=full_file_name),
image=full_file_name,
)
flash(process["msg"], "error")
return redirect(url_for("index"))
return response(error=True, message=process["msg"])
@APP.route("/files/download", methods=["POST"])
@ -872,26 +929,23 @@ def delete():
file_name = request.form.get("file_name")
process = file_cmd.delete_image(file_name)
if process["status"]:
flash(_("Image file deleted: %(file_name)s", file_name=file_name))
else:
flash(process["msg"], "error")
return redirect(url_for("index"))
if not process["status"]:
return response(error=True, message=process["msg"])
response_messages = [
(_("Image file deleted: %(file_name)s", file_name=file_name), "success")]
# Delete the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
if Path(prop_file_path).is_file():
process = file_cmd.delete_file(prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
response_messages.append((process["msg"], "success"))
else:
response_messages.append((process["msg"], "error"))
flash(process["msg"], "error")
return redirect(url_for("index"))
return redirect(url_for("index"))
return response(message=response_messages)
@APP.route("/files/rename", methods=["POST"])
@ -904,11 +958,11 @@ def rename():
new_file_name = request.form.get("new_file_name")
process = file_cmd.rename_image(file_name, new_file_name)
if process["status"]:
flash(_("Image file renamed to: %(file_name)s", file_name=new_file_name))
else:
flash(process["msg"], "error")
return redirect(url_for("index"))
if not process["status"]:
return response(error=True, message=process["msg"])
response_messages = [
(_("Image file renamed to: %(file_name)s", file_name=new_file_name), "success")]
# Rename the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
@ -917,13 +971,11 @@ def rename():
process = file_cmd.rename_file(prop_file_path, new_prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
response_messages.append((process["msg"], "success"))
else:
response_messages.append((process["msg"], "error"))
flash(process["msg"], "error")
return redirect(url_for("index"))
return redirect(url_for("index"))
return response(message=response_messages)
@APP.route("/files/copy", methods=["POST"])
@ -936,11 +988,11 @@ def copy():
new_file_name = request.form.get("copy_file_name")
process = file_cmd.copy_image(file_name, new_file_name)
if process["status"]:
flash(_("Copy of image file saved as: %(file_name)s", file_name=new_file_name))
else:
flash(process["msg"], "error")
return redirect(url_for("index"))
if not process["status"]:
return response(error=True, message=process["msg"])
response_messages = [
(_("Copy of image file saved as: %(file_name)s", file_name=new_file_name), "success")]
# Create a copy of the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
@ -949,13 +1001,11 @@ def copy():
process = file_cmd.copy_file(prop_file_path, new_prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
flash(process["msg"])
return redirect(url_for("index"))
response_messages.append((process["msg"], "success"))
else:
response_messages.append((process["msg"], "error"))
flash(process["msg"], "error")
return redirect(url_for("index"))
return redirect(url_for("index"))
return response(message=response_messages)
@APP.route("/files/extract_image", methods=["POST"])
@ -974,23 +1024,23 @@ def extract_image():
)
if extract_result["return_code"] == ReturnCodes.EXTRACTIMAGE_SUCCESS:
flash(ReturnCodeMapper.add_msg(extract_result).get("msg"))
response_messages = [(ReturnCodeMapper.add_msg(extract_result).get("msg"), "success")]
for properties_file in extract_result["properties_files_moved"]:
if properties_file["status"]:
flash(_("Properties file %(file)s moved to %(directory)s",
response_messages.append((_("Properties file %(file)s moved to %(directory)s",
file=properties_file['name'],
directory=CFG_DIR
))
), "success"))
else:
flash(_("Failed to move properties file %(file)s to %(directory)s",
response_messages.append((_("Failed to move properties file %(file)s to %(directory)s",
file=properties_file['name'],
directory=CFG_DIR
), "error")
else:
flash(ReturnCodeMapper.add_msg(extract_result).get("msg"), "error")
), "error"))
return redirect(url_for("index"))
return response(message=response_messages)
return response(error=True, message=ReturnCodeMapper.add_msg(extract_result).get("msg"))
@APP.route("/language", methods=["POST"])
@ -1006,8 +1056,7 @@ def change_language():
language = Locale.parse(locale)
language_name = language.get_language_name(locale)
flash(_("Changed Web Interface language to %(locale)s", locale=language_name))
return redirect(url_for("index"))
return response(message=_("Changed Web Interface language to %(locale)s", locale=language_name))
@APP.before_first_request

View File

@ -62,8 +62,7 @@ if ! test -e venv; then
pip3 install wheel
pip3 install -r requirements.txt
git rev-parse --is-inside-work-tree &> /dev/null
if [[ $? -eq 0 ]]; then
if git rev-parse --is-inside-work-tree &> /dev/null; then
git rev-parse HEAD > current
fi
fi

View File

@ -0,0 +1,78 @@
import pytest
import uuid
CFG_DIR = "/home/pi/.config/rascsi"
IMAGES_DIR = "/home/pi/images"
AFP_DIR = "/home/pi/afpshare"
SCSI_ID = 6
FILE_SIZE_1_MIB = 1048576
STATUS_SUCCESS = "success"
STATUS_ERROR = "error"
@pytest.fixture(scope="function")
def create_test_image(request, http_client):
images = []
def create(image_type="hds", size=1, auto_delete=True):
file_prefix = str(uuid.uuid4())
file_name = f"{file_prefix}.{image_type}"
response = http_client.post(
"/files/create",
data={
"file_name": file_prefix,
"type": image_type,
"size": size,
},
)
if response.json()["status"] != STATUS_SUCCESS:
raise Exception("Failed to create temporary image")
if auto_delete:
images.append(file_name)
return file_name
def delete():
for image in images:
http_client.post("/files/delete", data={"file_name": image})
request.addfinalizer(delete)
return create
@pytest.fixture(scope="function")
def list_files(http_client):
def files():
return [f["name"] for f in http_client.get("/").json()["data"]["files"]]
return files
@pytest.fixture(scope="function")
def list_attached_images(http_client):
def files():
return http_client.get("/").json()["data"]["attached_images"]
return files
@pytest.fixture(scope="function")
def delete_file(http_client):
def delete(file_name):
http_client.post("/files/delete", data={"file_name": file_name})
return delete
@pytest.fixture(scope="function")
def detach_devices(http_client):
def detach():
response = http_client.post("/scsi/detach_all")
if response.json()["status"] == STATUS_SUCCESS:
return True
raise Exception("Failed to detach SCSI devices")
return detach

View File

@ -0,0 +1,43 @@
from conftest import STATUS_SUCCESS, STATUS_ERROR
# route("/login", methods=["POST"])
def test_login_with_valid_credentials(pytestconfig, http_client_unauthenticated):
response = http_client_unauthenticated.post(
"/login",
data={
"username": pytestconfig.getoption("rascsi_username"),
"password": pytestconfig.getoption("rascsi_password"),
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert "env" in response_data["data"]
# route("/login", methods=["POST"])
def test_login_with_invalid_credentials(http_client_unauthenticated):
response = http_client_unauthenticated.post(
"/login",
data={
"username": "__INVALID_USER__",
"password": "__INVALID_PASS__",
},
)
response_data = response.json()
assert response.status_code == 401
assert response_data["status"] == STATUS_ERROR
assert response_data["messages"][0]["message"] == (
"You must log in with valid credentials for a user in the 'rascsi' group"
)
# route("/logout")
def test_logout(http_client):
response = http_client.get("/logout")
assert response.status_code == 200

View File

@ -0,0 +1,227 @@
import pytest
from conftest import (
IMAGES_DIR,
SCSI_ID,
FILE_SIZE_1_MIB,
STATUS_SUCCESS,
)
# route("/scsi/attach", methods=["POST"])
def test_attach_image(http_client, create_test_image, detach_devices):
test_image = create_test_image()
response = http_client.post(
"/scsi/attach",
data={
"file_name": test_image,
"file_size": FILE_SIZE_1_MIB,
"scsi_id": SCSI_ID,
"unit": 0,
"type": "SCHD",
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == (
f"Attached {test_image} as Hard Disk to SCSI ID {SCSI_ID} LUN 0"
)
# Cleanup
detach_devices()
# route("/scsi/attach_device", methods=["POST"])
@pytest.mark.parametrize(
"device_name,device_config",
[
# TODO: Fix networking in container, else SCBR attachment fails
# ("X68000 Host Bridge", {"type": "SCBR", "interface": "eth0", "inet": "10.10.20.1/24"}),
("DaynaPORT SCSI/Link", {"type": "SCDP", "interface": "eth0", "inet": "10.10.20.1/24"}),
("Host Services", {"type": "SCHS"}),
("Printer", {"type": "SCLP", "timeout": 30, "cmd": "lp -oraw %f"}),
],
)
def test_attach_device(http_client, detach_devices, device_name, device_config):
device_config["scsi_id"] = SCSI_ID
device_config["unit"] = 0
response = http_client.post(
"/scsi/attach_device",
data=device_config,
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == (
f"Attached {device_name} to SCSI ID {SCSI_ID} LUN 0"
)
# Cleanup
detach_devices()
# route("/scsi/detach", methods=["POST"])
def test_detach_device(http_client, create_test_image):
test_image = create_test_image()
http_client.post(
"/scsi/attach",
data={
"file_name": test_image,
"file_size": FILE_SIZE_1_MIB,
"scsi_id": SCSI_ID,
"unit": 0,
"type": "SCHD",
},
)
response = http_client.post(
"/scsi/detach",
data={
"scsi_id": SCSI_ID,
"unit": 0,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Detached SCSI ID {SCSI_ID} LUN 0"
# route("/scsi/detach_all", methods=["POST"])
def test_detach_all_devices(http_client, create_test_image, list_attached_images):
test_images = []
scsi_ids = [4, 5, 6]
for scsi_id in scsi_ids:
test_image = create_test_image()
test_images.append(test_image)
http_client.post(
"/scsi/attach",
data={
"file_name": test_image,
"file_size": FILE_SIZE_1_MIB,
"scsi_id": scsi_id,
"unit": 0,
"type": "SCHD",
},
)
assert list_attached_images() == test_images
response = http_client.post("/scsi/detach_all")
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert list_attached_images() == []
# route("/scsi/eject", methods=["POST"])
def test_eject_device(http_client, create_test_image, detach_devices):
test_image = create_test_image()
http_client.post(
"/scsi/attach",
data={
"file_name": test_image,
"file_size": FILE_SIZE_1_MIB,
"scsi_id": SCSI_ID,
"unit": 0,
"type": "SCCD", # CD-ROM
},
)
response = http_client.post(
"/scsi/eject",
data={
"scsi_id": SCSI_ID,
"unit": 0,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Ejected SCSI ID {SCSI_ID} LUN 0"
# Cleanup
detach_devices()
# route("/scsi/info", methods=["POST"])
def test_show_device_info(http_client, create_test_image, detach_devices):
test_image = create_test_image()
http_client.post(
"/scsi/attach",
data={
"file_name": test_image,
"file_size": FILE_SIZE_1_MIB,
"scsi_id": SCSI_ID,
"unit": 0,
"type": "SCHD",
},
)
response = http_client.post(
"/scsi/info",
data={
"scsi_id": SCSI_ID,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert "device_info" in response_data["data"]
assert response_data["data"]["device_info"]["file"] == f"{IMAGES_DIR}/{test_image}"
# Cleanup
detach_devices()
# route("/scsi/reserve", methods=["POST"])
# route("/scsi/release", methods=["POST"])
def test_reserve_and_release_device(http_client):
scsi_id = 0
response = http_client.post(
"/scsi/reserve",
data={
"scsi_id": scsi_id,
"memo": "TEST",
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Reserved SCSI ID {scsi_id}"
response = http_client.post(
"/scsi/release",
data={
"scsi_id": scsi_id,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == (
f"Released the reservation for SCSI ID {scsi_id}"
)

View File

@ -0,0 +1,317 @@
import pytest
import uuid
import os
from conftest import (
IMAGES_DIR,
AFP_DIR,
SCSI_ID,
FILE_SIZE_1_MIB,
STATUS_SUCCESS,
)
# route("/files/create", methods=["POST"])
def test_create_file(http_client, list_files, delete_file):
file_prefix = str(uuid.uuid4())
file_name = f"{file_prefix}.hds"
response = http_client.post(
"/files/create",
data={
"file_name": file_prefix,
"type": "hds",
"size": 1,
},
)
response_data = response.json()
assert response.status_code == 201
assert response_data["status"] == STATUS_SUCCESS
assert response_data["data"]["image"] == file_name
assert response_data["messages"][0]["message"] == f"Image file created: {file_name}"
assert file_name in list_files()
# Cleanup
delete_file(file_name)
# route("/files/rename", methods=["POST"])
def test_rename_file(http_client, create_test_image, list_files, delete_file):
original_file = create_test_image(auto_delete=False)
renamed_file = f"{uuid.uuid4()}.rename"
response = http_client.post(
"/files/rename",
data={"file_name": original_file, "new_file_name": renamed_file},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Image file renamed to: {renamed_file}"
assert renamed_file in list_files()
# Cleanup
delete_file(renamed_file)
# route("/files/copy", methods=["POST"])
def test_copy_file(http_client, create_test_image, list_files, delete_file):
original_file = create_test_image()
copy_file = f"{uuid.uuid4()}.copy"
response = http_client.post(
"/files/copy",
data={
"file_name": original_file,
"copy_file_name": copy_file,
},
)
response_data = response.json()
files = list_files()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Copy of image file saved as: {copy_file}"
assert original_file in files
assert copy_file in files
# Cleanup
delete_file(copy_file)
# route("/files/delete", methods=["POST"])
def test_delete_file(http_client, create_test_image, list_files):
file_name = create_test_image()
response = http_client.post("/files/delete", data={"file_name": file_name})
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Image file deleted: {file_name}"
assert file_name not in list_files()
# route("/files/extract_image", methods=["POST"])
@pytest.mark.parametrize(
"archive_file_name,image_file_name",
[
("test_image.zip", "test_image_from_zip.hds"),
("test_image.sit", "test_image_from_sit.hds"),
("test_image.7z", "test_image_from_7z.hds"),
],
)
def test_extract_file(
httpserver, http_client, list_files, delete_file, archive_file_name, image_file_name
):
http_path = f"/images/{archive_file_name}"
url = httpserver.url_for(http_path)
with open(f"tests/assets/{archive_file_name}", mode="rb") as file:
zip_file_data = file.read()
httpserver.expect_request(http_path).respond_with_data(
zip_file_data,
mimetype="application/octet-stream",
)
http_client.post(
"/files/download_to_images",
data={
"url": url,
},
)
response = http_client.post(
"/files/extract_image",
data={
"archive_file": archive_file_name,
"archive_members": image_file_name,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == "Extracted 1 file(s)"
assert image_file_name in list_files()
# Cleanup
delete_file(archive_file_name)
delete_file(image_file_name)
# route("/files/upload", methods=["POST"])
def test_upload_file(http_client, delete_file):
file_name = f"{uuid.uuid4()}.test"
with open("tests/assets/test_image.hds", mode="rb") as file:
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0, 0)
number_of_chunks = 4
# Note: The test file needs to be cleanly divisible by the chunk size
chunk_size = int(file_size / number_of_chunks)
for chunk_number in range(0, 4):
if chunk_number == 0:
chunk_byte_offset = 0
else:
chunk_byte_offset = chunk_number * chunk_size
form_data = {
"dzuuid": str(uuid.uuid4()),
"dzchunkindex": chunk_number,
"dzchunksize": chunk_size,
"dzchunkbyteoffset": chunk_byte_offset,
"dztotalfilesize": file_size,
"dztotalchunkcount": number_of_chunks,
}
file_data = {"file": (file_name, file.read(chunk_size))}
response = http_client.post(
"/files/upload",
data=form_data,
files=file_data,
)
assert response.status_code == 200
assert response.text == "File upload successful!"
file = [f for f in http_client.get("/").json()["data"]["files"] if f["name"] == file_name][0]
assert file["size"] == file_size
# Cleanup
delete_file(file_name)
# route("/files/download", methods=["POST"])
def test_download_file(http_client, create_test_image):
file_name = create_test_image()
response = http_client.post("/files/download", data={"file": f"{IMAGES_DIR}/{file_name}"})
assert response.status_code == 200
assert response.headers["content-type"] == "application/octet-stream"
assert response.headers["content-disposition"] == f"attachment; filename={file_name}"
assert response.headers["content-length"] == str(FILE_SIZE_1_MIB)
# route("/files/download_to_afp", methods=["POST"])
def test_download_url_to_afp_dir(httpserver, http_client):
file_name = str(uuid.uuid4())
http_path = f"/images/{file_name}"
url = httpserver.url_for(http_path)
with open("tests/assets/test_image.hds", mode="rb") as file:
file_data = file.read()
httpserver.expect_request(http_path).respond_with_data(
file_data,
mimetype="application/octet-stream",
)
response = http_client.post(
"/files/download_to_afp",
data={
"url": url,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {AFP_DIR}"
# route("/files/download_to_images", methods=["POST"])
def test_download_url_to_images_dir(httpserver, http_client, list_files, delete_file):
file_name = str(uuid.uuid4())
http_path = f"/images/{file_name}"
url = httpserver.url_for(http_path)
with open("tests/assets/test_image.hds", mode="rb") as file:
test_file_data = file.read()
httpserver.expect_request(http_path).respond_with_data(
test_file_data,
mimetype="application/octet-stream",
)
response = http_client.post(
"/files/download_to_images",
data={
"url": url,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert file_name in list_files()
assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {IMAGES_DIR}"
# Cleanup
delete_file(file_name)
# route("/files/download_to_iso", methods=["POST"])
def test_download_url_to_iso(
httpserver,
http_client,
list_files,
list_attached_images,
detach_devices,
delete_file,
):
test_file_name = str(uuid.uuid4())
iso_file_name = f"{test_file_name}.iso"
http_path = f"/images/{test_file_name}"
url = httpserver.url_for(http_path)
with open("tests/assets/test_image.hds", mode="rb") as file:
test_file_data = file.read()
httpserver.expect_request(http_path).respond_with_data(
test_file_data,
mimetype="application/octet-stream",
)
response = http_client.post(
"/files/download_to_iso",
data={
"scsi_id": SCSI_ID,
"type": "-hfs",
"url": url,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert iso_file_name in list_files()
assert iso_file_name in list_attached_images()
m = response_data["messages"]
assert m[0]["message"] == 'Created CD-ROM ISO image with arguments "-hfs"'
assert m[1]["message"] == f"Saved image as: {IMAGES_DIR}/{iso_file_name}"
assert m[2]["message"] == f"Attached to SCSI ID {SCSI_ID}"
# Cleanup
detach_devices()
delete_file(iso_file_name)

View File

@ -0,0 +1,99 @@
import uuid
from conftest import (
CFG_DIR,
FILE_SIZE_1_MIB,
STATUS_SUCCESS,
)
# route("/")
def test_index(http_client):
response = http_client.get("/")
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert "devices" in response_data["data"]
# route("/env")
def test_get_env_info(http_client):
response = http_client.get("/env")
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert "running_env" in response_data["data"]
# route("/pwa/<path:pwa_path>")
def test_pwa_route(http_client):
response = http_client.get("/pwa/favicon.ico")
assert response.status_code == 200
assert response.headers["content-disposition"] == "inline; filename=favicon.ico"
# route("/drive/list", methods=["GET"])
def test_show_named_drive_presets(http_client):
response = http_client.get("/drive/list")
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert "cd_conf" in response_data["data"]
assert "hd_conf" in response_data["data"]
assert "rm_conf" in response_data["data"]
# route("/drive/cdrom", methods=["POST"])
def test_create_cdrom_properties_file(http_client):
file_name = f"{uuid.uuid4()}.iso"
response = http_client.post(
"/drive/cdrom",
data={
"vendor": "TEST_AAA",
"product": "TEST_BBB",
"revision": "1.0A",
"block_size": 2048,
"file_name": file_name,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == (
f"File created: {CFG_DIR}/{file_name}.properties"
)
# route("/drive/create", methods=["POST"])
def test_create_image_with_properties_file(http_client, delete_file):
file_prefix = str(uuid.uuid4())
file_name = f"{file_prefix}.hds"
response = http_client.post(
"/drive/create",
data={
"vendor": "TEST_AAA",
"product": "TEST_BBB",
"revision": "1.0A",
"block_size": 512,
"size": FILE_SIZE_1_MIB,
"file_type": "hds",
"file_name": file_prefix,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Image file created: {file_name}"
# Cleanup
delete_file(file_name)

View File

@ -0,0 +1,149 @@
import pytest
import uuid
from conftest import CFG_DIR, STATUS_SUCCESS
# route("/language", methods=["POST"])
@pytest.mark.parametrize(
"locale,confirm_message",
[
("de", "Webinterface-Sprache auf Deutsch geändert"),
("es", "Se ha cambiado el lenguaje de la Interfaz Web a español"),
("fr", "Langue de linterface web changée pour français"),
("sv", "Bytte webbgränssnittets språk till svenska"),
("en", "Changed Web Interface language to English"),
],
)
def test_set_language(http_client, locale, confirm_message):
response = http_client.post(
"/language",
data={
"locale": locale,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == confirm_message
# route("/logs/level", methods=["POST"])
@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "err", "critical", "off"])
def test_set_log_level(http_client, level):
response = http_client.post(
"/logs/level",
data={
"level": level,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == f"Log level set to {level}"
# Cleanup
http_client.post(
"/logs/level",
data={
"level": "debug",
},
)
# route("/logs/show", methods=["POST"])
def test_show_logs(http_client):
response = http_client.post(
"/logs/show",
data={
"lines": 100,
"scope": "",
},
)
assert response.status_code == 200
assert response.headers["content-type"] == "text/plain"
# route("/config/save", methods=["POST"])
# route("/config/load", methods=["POST"])
def test_save_load_and_delete_configs(http_client):
config_name = str(uuid.uuid4())
config_json_file = f"{config_name}.json"
reserved_scsi_id = 0
reservation_memo = str(uuid.uuid4())
# Confirm the initial state
assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == ""
# Save the initial state to a config
response = http_client.post(
"/config/save",
data={
"name": config_name,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == (
f"File created: {CFG_DIR}/{config_json_file}"
)
assert config_json_file in http_client.get("/").json()["data"]["config_files"]
# Modify the state
http_client.post(
"/scsi/reserve",
data={
"scsi_id": reserved_scsi_id,
"memo": reservation_memo,
},
)
assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == reservation_memo
# Load the saved config
response = http_client.post(
"/config/load",
data={
"name": config_json_file,
"load": True,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == (
f"Loaded configurations from: {CFG_DIR}/{config_json_file}"
)
# Confirm the application has returned to its initial state
assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == ""
# Delete the saved config
response = http_client.post(
"/config/load",
data={
"name": config_json_file,
"delete": True,
},
)
response_data = response.json()
assert response.status_code == 200
assert response_data["status"] == STATUS_SUCCESS
assert response_data["messages"][0]["message"] == (
f"File deleted: {CFG_DIR}/{config_json_file}"
)
assert config_json_file not in http_client.get("/").json()["data"]["config_files"]

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,66 @@
import pytest
import requests
def pytest_addoption(parser):
parser.addoption("--base_url", action="store", default="http://localhost:8080")
parser.addoption("--httpserver_host", action="store", default="host.docker.internal")
parser.addoption("--httpserver_listen_address", action="store", default="127.0.0.1")
parser.addoption("--rascsi_username", action="store", default="pi")
parser.addoption("--rascsi_password", action="store", default="rascsi")
@pytest.fixture(scope="session")
def httpserver_listen_address(pytestconfig):
return (pytestconfig.getoption("httpserver_listen_address"), 0)
@pytest.fixture(scope="function", autouse=True)
def set_httpserver_hostname(pytestconfig, httpserver):
# The HTTP requests are made by Python from within the container so we need
# httpserver.url_for to generate URLs which point to the Docker host
httpserver.host = pytestconfig.getoption("httpserver_host")
@pytest.fixture(scope="session", autouse=True)
def ensure_all_devices_detached(create_http_client):
http_client = create_http_client()
http_client.post("/scsi/detach_all")
@pytest.fixture(scope="session")
def create_http_client(pytestconfig):
def create(authenticate=True):
session = requests.Session()
session.headers.update({"Accept": "application/json"})
session.original_request = session.request
def relative_request(method, url, *args, **kwargs):
if url[:4] != "http":
url = pytestconfig.getoption("base_url") + url
return session.original_request(method, url, *args, **kwargs)
session.request = relative_request
if authenticate:
session.post(
"/login",
data={
"username": pytestconfig.getoption("rascsi_username"),
"password": pytestconfig.getoption("rascsi_password"),
},
)
return session
return create
@pytest.fixture(scope="function")
def http_client(create_http_client):
return create_http_client(authenticate=True)
@pytest.fixture(scope="function")
def http_client_unauthenticated(create_http_client):
return create_http_client(authenticate=False)