diff --git a/.gitignore b/.gitignore
index 7a14233f..f9714d4f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,6 +11,7 @@ src/raspberrypi/hfdisk/
*~
messages.pot
messages.mo
+report.xml
docker/docker-compose.override.yml
/docker/volumes/images/*
diff --git a/docker/rascsi-web/Dockerfile b/docker/rascsi-web/Dockerfile
index 4c4298d5..fbd8b7ac 100644
--- a/docker/rascsi-web/Dockerfile
+++ b/docker/rascsi-web/Dockerfile
@@ -6,18 +6,29 @@ FROM "${OS_ARCH}/${OS_DISTRO}:${OS_VERSION}"
EXPOSE 80 443
ARG DEBIAN_FRONTEND=noninteractive
-RUN apt-get update && apt-get install -y --no-install-recommends sudo rsyslog procps
+RUN apt-get update && apt-get install -y --no-install-recommends sudo systemd rsyslog procps
RUN groupadd pi
RUN useradd --create-home --shell /bin/bash -g pi pi
RUN echo "pi ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
+RUN echo "pi:rascsi" | chpasswd
-WORKDIR /home/pi
+RUN mkdir /home/pi/afpshare
+RUN touch /etc/dhcpcd.conf
+RUN mkdir -p /etc/network/interfaces.d/
+
+WORKDIR /home/pi/RASCSI
USER pi
-COPY --chown=pi:pi . RASCSI
-RUN cd RASCSI && ./easyinstall.sh --run_choice=11 --skip-token
+COPY --chown=pi:pi . .
+
+# Standalone RaSCSI web UI
+RUN ./easyinstall.sh --run_choice=11 --skip-token
+
+# Wired network bridge
+RUN ./easyinstall.sh --run_choice=6 --headless
USER root
+WORKDIR /home/pi
RUN pip3 install watchdog
COPY docker/rascsi-web/start.sh /usr/local/bin/start.sh
RUN chmod +x /usr/local/bin/start.sh
diff --git a/docker/rascsi/Dockerfile b/docker/rascsi/Dockerfile
index 508bd185..4e3c60f5 100644
--- a/docker/rascsi/Dockerfile
+++ b/docker/rascsi/Dockerfile
@@ -6,15 +6,15 @@ FROM "${OS_ARCH}/${OS_DISTRO}:${OS_VERSION}"
EXPOSE 6868
ARG DEBIAN_FRONTEND=noninteractive
-RUN apt-get update && apt-get install -y --no-install-recommends sudo rsyslog patch
+RUN apt-get update && apt-get install -y --no-install-recommends sudo systemd rsyslog patch
RUN groupadd pi
RUN useradd --create-home --shell /bin/bash -g pi pi
RUN echo "pi ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
-USER pi
-COPY --chown=pi:pi . /home/pi/RASCSI
WORKDIR /home/pi/RASCSI
+USER pi
+COPY --chown=pi:pi . .
# Workaround for Bullseye amd64 compilation error
# https://github.com/akuker/RASCSI/issues/821
@@ -24,6 +24,7 @@ RUN patch -p0 < docker/rascsi/cfilesystem.patch
RUN ./easyinstall.sh --run_choice=10 --cores=`nproc` --skip-token
USER root
+WORKDIR /home/pi
COPY docker/rascsi/rascsi_wrapper.sh /usr/local/bin/rascsi_wrapper.sh
RUN chmod +x /usr/local/bin/rascsi_wrapper.sh
CMD ["/usr/local/bin/rascsi_wrapper.sh", "-L", "trace", "-r", "7", "-F", "/home/pi/images"]
diff --git a/easyinstall.sh b/easyinstall.sh
index 77489f4e..69e4ef39 100755
--- a/easyinstall.sh
+++ b/easyinstall.sh
@@ -691,23 +691,31 @@ function setupWiredNetworking() {
echo "WARNING: If you continue, the IP address of your Pi may change upon reboot."
echo "Please make sure you will not lose access to the Pi system."
echo ""
- echo "Do you want to proceed with network configuration using the default settings? [Y/n]"
- read REPLY
- if [ "$REPLY" == "N" ] || [ "$REPLY" == "n" ]; then
- echo "Available wired interfaces on this system:"
- echo `ip -o addr show scope link | awk '{split($0, a); print $2}' | grep eth`
- echo "Please type the wired interface you want to use and press Enter:"
- read SELECTED
- LAN_INTERFACE=$SELECTED
+ if [[ -z $HEADLESS ]]; then
+ echo "Do you want to proceed with network configuration using the default settings? [Y/n]"
+ read REPLY
+
+ if [ "$REPLY" == "N" ] || [ "$REPLY" == "n" ]; then
+ echo "Available wired interfaces on this system:"
+ echo `ip -o addr show scope link | awk '{split($0, a); print $2}' | grep eth`
+ echo "Please type the wired interface you want to use and press Enter:"
+ read SELECTED
+ LAN_INTERFACE=$SELECTED
+ fi
fi
if [ "$(grep -c "^denyinterfaces" /etc/dhcpcd.conf)" -ge 1 ]; then
echo "WARNING: Network forwarding may already have been configured. Proceeding will overwrite the configuration."
- echo "Press enter to continue or CTRL-C to exit"
- read REPLY
+
+ if [[ -z $HEADLESS ]]; then
+ echo "Press enter to continue or CTRL-C to exit"
+ read REPLY
+ fi
+
sudo sed -i /^denyinterfaces/d /etc/dhcpcd.conf
fi
+
sudo bash -c 'echo "denyinterfaces '$LAN_INTERFACE'" >> /etc/dhcpcd.conf'
echo "Modified /etc/dhcpcd.conf"
@@ -720,6 +728,12 @@ function setupWiredNetworking() {
echo "Either use the Web UI, or do this on the command line (assuming SCSI ID 6):"
echo "rasctl -i 6 -c attach -t scdp -f $LAN_INTERFACE"
echo ""
+
+ if [[ $HEADLESS ]]; then
+ echo "Skipping reboot in headless mode"
+ return 0
+ fi
+
echo "We need to reboot your Pi"
echo "Press Enter to reboot or CTRL-C to exit"
read
@@ -1269,6 +1283,7 @@ function runChoice() {
preparePythonCommon
cachePipPackages
installRaScsiWebInterface
+ enableWebInterfaceAuth
echo "Configuring RaSCSI Web Interface stand-alone - Complete!"
echo "Launch the Web Interface with the 'start.sh' script. To use a custom port for the web server: 'start.sh --web-port=8081"
;;
@@ -1367,6 +1382,9 @@ while [ "$1" != "" ]; do
-s | --skip-token)
SKIP_TOKEN=1
;;
+ -h | --headless)
+ HEADLESS=1
+ ;;
*)
echo "ERROR: Unknown parameter \"$PARAM\""
exit 1
diff --git a/python/common/src/rascsi/file_cmds.py b/python/common/src/rascsi/file_cmds.py
index 11b2d9a6..de6f4971 100644
--- a/python/common/src/rascsi/file_cmds.py
+++ b/python/common/src/rascsi/file_cmds.py
@@ -521,6 +521,8 @@ class FileCmds:
# introduce more sophisticated format detection logic here.
if isinstance(config, dict):
self.ractl.detach_all()
+ for scsi_id in range(0, 8):
+ RESERVATIONS[scsi_id] = ""
ids_to_reserve = []
for item in config["reserved_ids"]:
ids_to_reserve.append(item["id"])
diff --git a/python/common/src/rascsi/ractl_cmds.py b/python/common/src/rascsi/ractl_cmds.py
index 4cc9c37a..25a001e0 100644
--- a/python/common/src/rascsi/ractl_cmds.py
+++ b/python/common/src/rascsi/ractl_cmds.py
@@ -40,7 +40,7 @@ class RaCtlCmds:
version = (str(result.server_info.version_info.major_version) + "." +
str(result.server_info.version_info.minor_version) + "." +
str(result.server_info.version_info.patch_version))
- log_levels = result.server_info.log_level_info.log_levels
+ log_levels = list(result.server_info.log_level_info.log_levels)
current_log_level = result.server_info.log_level_info.current_log_level
reserved_ids = list(result.server_info.reserved_ids_info.ids)
image_dir = result.server_info.image_files_info.default_image_folder
@@ -113,7 +113,7 @@ class RaCtlCmds:
result = proto.PbResult()
result.ParseFromString(data)
ifs = result.network_interfaces_info.name
- return {"status": result.status, "ifs": ifs}
+ return {"status": result.status, "ifs": list(ifs)}
def get_device_types(self):
"""
@@ -140,7 +140,7 @@ class RaCtlCmds:
"removable": device.properties.removable,
"supports_file": device.properties.supports_file,
"params": params,
- "block_sizes": device.properties.block_sizes,
+ "block_sizes": list(device.properties.block_sizes),
}
return {"status": result.status, "device_types": device_types}
@@ -394,7 +394,7 @@ class RaCtlCmds:
dpath = result.devices_info.devices[i].file.name
dfile = dpath.replace(image_files_info["images_dir"] + "/", "")
- dparam = result.devices_info.devices[i].params
+ dparam = dict(result.devices_info.devices[i].params)
dven = result.devices_info.devices[i].vendor
dprod = result.devices_info.devices[i].product
drev = result.devices_info.devices[i].revision
diff --git a/python/web/.flake8 b/python/web/.flake8
new file mode 100644
index 00000000..7da1f960
--- /dev/null
+++ b/python/web/.flake8
@@ -0,0 +1,2 @@
+[flake8]
+max-line-length = 100
diff --git a/python/web/pyproject.toml b/python/web/pyproject.toml
new file mode 100644
index 00000000..7adcc930
--- /dev/null
+++ b/python/web/pyproject.toml
@@ -0,0 +1,8 @@
+[tool.pytest.ini_options]
+addopts = "--junitxml=report.xml"
+log_cli = true
+log_cli_level = "warn"
+
+[tool.black]
+line-length = 100
+target-version = ['py37', 'py38', 'py39']
diff --git a/python/web/requirements-dev.txt b/python/web/requirements-dev.txt
new file mode 100644
index 00000000..b935690c
--- /dev/null
+++ b/python/web/requirements-dev.txt
@@ -0,0 +1,4 @@
+pytest==7.1.3
+pytest-httpserver==1.0.6
+black==22.8.0
+flake8==5.0.4
diff --git a/python/web/src/static/style.css b/python/web/src/static/style.css
index 48fc91c6..202b56d2 100644
--- a/python/web/src/static/style.css
+++ b/python/web/src/static/style.css
@@ -31,18 +31,33 @@ table, tr, td {
margin: none;
}
-.error {
- color: white;
- font-size:20px;
- background-color:red;
- white-space: pre-line;
+div.flash {
+ margin-top: 5px;
+ margin-bottom: 5px;
}
-.message {
+div.flash div {
color: white;
- font-size:20px;
- background-color:green;
+ font-size: 18px;
white-space: pre-line;
+ padding: 2px 5px;
+}
+
+div.flash div.success {
+ background-color: green;
+}
+
+div.flash div.warning {
+ background-color: orange;
+ color: black;
+}
+
+div.flash div.error {
+ background-color: red;
+}
+
+div.flash div.info {
+ background-color: #0d6efd;
}
td.inactive {
diff --git a/python/web/src/templates/base.html b/python/web/src/templates/base.html
index ce8553e1..2eb304de 100644
--- a/python/web/src/templates/base.html
+++ b/python/web/src/templates/base.html
@@ -1,7 +1,7 @@
- {{ _("RaSCSI Reloaded Control Page") }} [{{ host }}]
+ {{ _("RaSCSI Reloaded Control Page") }} [{{ env["host"] }}]
@@ -26,12 +26,12 @@
@@ -43,9 +43,9 @@
diff --git a/python/web/src/templates/drives.html b/python/web/src/templates/drives.html
index 76393faf..a9cdad95 100644
--- a/python/web/src/templates/drives.html
+++ b/python/web/src/templates/drives.html
@@ -135,7 +135,7 @@
{% endfor %}
- {{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=free_disk) }}
+ {{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=env["free_disk_space"]) }}
{{ _("Cancel") }}
{% endblock content %}
diff --git a/python/web/src/templates/index.html b/python/web/src/templates/index.html
index 4eb43528..d0ed8ef7 100644
--- a/python/web/src/templates/index.html
+++ b/python/web/src/templates/index.html
@@ -156,8 +156,16 @@
{{ _("Select a valid SCSI ID and LUN to attach to. Unless you know what you're doing, always use LUN 0.", url="https://en.wikipedia.org/wiki/Logical_unit_number") }}
{{ _("If RaSCSI was unable to detect the media type associated with the image, you get to choose the type from the dropdown.") }}
- {{ _("Recognized image file types: %(valid_image_suffixes)s", valid_image_suffixes=valid_image_suffixes) }}
- {{ _("Recognized archive file types: %(ARCHIVE_FILE_SUFFIXES)s", ARCHIVE_FILE_SUFFIXES=ARCHIVE_FILE_SUFFIXES) }}
+
+ {{ _("Recognized image file types:") }}
+ {% set comma = joiner(", ") %}
+ {% for extension in valid_image_suffixes %}{{ comma() }}.{{ extension}}{% endfor %}
+
+
+ {{ _("Recognized archive file types:") }}
+ {% set comma = joiner(", ") %}
+ {% for extension in ARCHIVE_FILE_SUFFIXES %}{{ comma() }}.{{ extension}}{% endfor %}
+
@@ -301,7 +309,7 @@
{% endfor %}
-{{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=free_disk) }}
+{{ _("%(disk_space)s MiB disk space remaining on the Pi", disk_space=env["free_disk_space"]) }}
diff --git a/python/web/src/web.py b/python/web/src/web.py
index 9c567763..2af3d135 100644
--- a/python/web/src/web.py
+++ b/python/web/src/web.py
@@ -27,6 +27,7 @@ from flask import (
make_response,
session,
abort,
+ jsonify,
)
from rascsi.ractl_cmds import RaCtlCmds
@@ -67,6 +68,66 @@ from settings import (
APP = Flask(__name__)
BABEL = Babel(APP)
+def get_env_info():
+ """
+ Get information about the app/host environment
+ """
+ ip_addr, host = sys_cmd.get_ip_and_host()
+
+ if "username" in session:
+ username = session["username"]
+ else:
+ username = None
+
+ return {
+ "running_env": sys_cmd.running_env(),
+ "username": username,
+ "auth_active": auth_active(AUTH_GROUP)["status"],
+ "ip_addr": ip_addr,
+ "host": host,
+ "free_disk_space": int(sys_cmd.disk_space()["free"] / 1024 / 1024),
+ }
+
+
+def response(
+ template=None,
+ message=None,
+ redirect_url=None,
+ error=False,
+ status_code=200,
+ **kwargs
+):
+ """
+ Generates a HTML or JSON HTTP response
+ """
+ status = "error" if error else "success"
+
+ if isinstance(message, list):
+ messages = message
+ elif message is None:
+ messages = []
+ else:
+ messages = [(str(message), status)]
+
+ if request.headers.get("accept") == "application/json":
+ return jsonify({
+ "status": status,
+ "messages": [{"message": m, "category": c} for m, c in messages],
+ "data": kwargs
+ }), status_code
+
+ if messages:
+ for message, category in messages:
+ flash(message, category)
+
+ if template:
+ kwargs["env"] = get_env_info()
+ return render_template(template, **kwargs)
+
+ if redirect_url:
+ return redirect(url_for(redirect_url))
+ return redirect(url_for("index"))
+
@BABEL.localeselector
def get_locale():
@@ -87,12 +148,14 @@ def get_locale():
def get_supported_locales():
"""
- Returns a list of Locale objects that the Web Interfaces supports
+ Returns a list of languages supported by the web UI
"""
- locales = BABEL.list_translations()
- locales.append(Locale("en"))
- sorted_locales = sorted(locales, key=lambda x: x.language)
- return sorted_locales
+ locales = [
+ {"language": x.language, "display_name": x.display_name}
+ for x in [*BABEL.list_translations(), Locale("en")]
+ ]
+
+ return sorted(locales, key=lambda x: x["language"])
# pylint: disable=too-many-locals
@@ -147,7 +210,7 @@ def index():
server_info["scmo"]
)
- valid_image_suffixes = "." + ", .".join(
+ valid_image_suffixes = (
server_info["schd"] +
server_info["scrm"] +
server_info["scmo"] +
@@ -159,14 +222,12 @@ def index():
else:
username = None
- return render_template(
- "index.html",
+ return response(
+ template="index.html",
locales=get_supported_locales(),
bridge_configured=sys_cmd.is_bridge_setup(),
netatalk_configured=sys_cmd.running_proc("afpd"),
macproxy_configured=sys_cmd.running_proc("macproxy"),
- ip_addr=ip_addr,
- host=host,
devices=formatted_devices,
files=extended_image_files,
config_files=config_files,
@@ -181,28 +242,32 @@ def index():
reserved_scsi_ids=reserved_scsi_ids,
RESERVATIONS=RESERVATIONS,
max_file_size=int(int(MAX_FILE_SIZE) / 1024 / 1024),
- running_env=sys_cmd.running_env(),
version=server_info["version"],
log_levels=server_info["log_levels"],
current_log_level=server_info["current_log_level"],
netinfo=ractl_cmd.get_network_info(),
device_types=device_types,
- free_disk=int(sys_cmd.disk_space()["free"] / 1024 / 1024),
image_suffixes_to_create=image_suffixes_to_create,
valid_image_suffixes=valid_image_suffixes,
cdrom_file_suffix=tuple(server_info["sccd"]),
removable_file_suffix=tuple(server_info["scrm"]),
mo_file_suffix=tuple(server_info["scmo"]),
- username=username,
- auth_active=auth_active(AUTH_GROUP)["status"],
PROPERTIES_SUFFIX=PROPERTIES_SUFFIX,
- ARCHIVE_FILE_SUFFIXES="." + ", .".join(ARCHIVE_FILE_SUFFIXES),
+ ARCHIVE_FILE_SUFFIXES=ARCHIVE_FILE_SUFFIXES,
REMOVABLE_DEVICE_TYPES=ractl_cmd.get_removable_device_types(),
DISK_DEVICE_TYPES=ractl_cmd.get_disk_device_types(),
PERIPHERAL_DEVICE_TYPES=ractl_cmd.get_peripheral_device_types(),
)
+@APP.route("/env")
+def env():
+ """
+ Shows information about the app/host environment
+ """
+ return response(**get_env_info())
+
+
@APP.route("/drive/list", methods=["GET"])
def drive_list():
"""
@@ -211,23 +276,20 @@ def drive_list():
# Reads the canonical drive properties into a dict
# The file resides in the current dir of the web ui process
drive_properties = Path(DRIVE_PROPERTIES_FILE)
- if drive_properties.is_file():
- process = file_cmd.read_drive_properties(str(drive_properties))
- process = ReturnCodeMapper.add_msg(process)
- if not process["status"]:
- flash(process["msg"], "error")
- return redirect(url_for("index"))
- conf = process["conf"]
- else:
- flash(
- _(
- "Could not read drive properties from %(properties_file)s",
- properties_file=drive_properties,
- ),
- "error",
- )
- return redirect(url_for("index"))
+ if not drive_properties.is_file():
+ return response(
+ error=True,
+ message=_("Could not read drive properties from %(properties_file)s",
+ properties_file=drive_properties),
+ )
+ process = file_cmd.read_drive_properties(str(drive_properties))
+ process = ReturnCodeMapper.add_msg(process)
+
+ if not process["status"]:
+ return response(error=True, message=process["msg"])
+
+ conf = process["conf"]
hd_conf = []
cd_conf = []
rm_conf = []
@@ -245,30 +307,18 @@ def drive_list():
device["size_mb"] = "{:,.2f}".format(device["size"] / 1024 / 1024)
rm_conf.append(device)
- if "username" in session:
- username = session["username"]
- else:
- username = None
-
server_info = ractl_cmd.get_server_info()
- ip_addr, host = sys_cmd.get_ip_and_host()
- return render_template(
+ return response(
"drives.html",
files=file_cmd.list_images()["files"],
base_dir=server_info["image_dir"],
hd_conf=hd_conf,
cd_conf=cd_conf,
rm_conf=rm_conf,
- running_env=sys_cmd.running_env(),
version=server_info["version"],
- free_disk=int(sys_cmd.disk_space()["free"] / 1024 / 1024),
cdrom_file_suffix=tuple(server_info["sccd"]),
- username=username,
- auth_active=auth_active(AUTH_GROUP)["status"],
- ip_addr=ip_addr,
- host=host,
- )
+ )
@APP.route("/login", methods=["POST"])
@@ -278,20 +328,17 @@ def login():
"""
username = request.form["username"]
password = request.form["password"]
-
groups = [g.gr_name for g in getgrall() if username in g.gr_mem]
+
if AUTH_GROUP in groups:
if authenticate(str(username), str(password)):
session["username"] = request.form["username"]
- return redirect(url_for("index"))
- flash(
- _(
- "You must log in with credentials for a user in the '%(group)s' group",
- group=AUTH_GROUP,
- ),
- "error",
- )
- return redirect(url_for("index"))
+ return response(env=get_env_info())
+
+ return response(error=True, status_code=401, message=_(
+ "You must log in with valid credentials for a user in the '%(group)s' group",
+ group=AUTH_GROUP,
+ ))
@APP.route("/logout")
@@ -300,7 +347,7 @@ def logout():
Removes the logged in user from the session
"""
session.pop("username", None)
- return redirect(url_for("index"))
+ return response()
@APP.route("/pwa/")
@@ -319,8 +366,7 @@ def login_required(func):
def decorated_function(*args, **kwargs):
auth = auth_active(AUTH_GROUP)
if auth["status"] and "username" not in session:
- flash(auth["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=auth["msg"])
return func(*args, **kwargs)
return decorated_function
@@ -342,11 +388,8 @@ def drive_create():
# Creating the image file
process = file_cmd.create_new_image(file_name, file_type, size)
- if process["status"]:
- flash(_("Image file created: %(file_name)s", file_name=full_file_name))
- else:
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ if not process["status"]:
+ return response(error=True, message=process["msg"])
# Creating the drive properties file
prop_file_name = f"{file_name}.{file_type}.{PROPERTIES_SUFFIX}"
@@ -358,12 +401,10 @@ def drive_create():
}
process = file_cmd.write_drive_properties(prop_file_name, properties)
process = ReturnCodeMapper.add_msg(process)
- if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ if not process["status"]:
+ return response(error=True, message=process["msg"])
- flash(process['msg'], "error")
- return redirect(url_for("index"))
+ return response(message=_("Image file created: %(file_name)s", file_name=full_file_name))
@APP.route("/drive/cdrom", methods=["POST"])
@@ -389,11 +430,9 @@ def drive_cdrom():
process = file_cmd.write_drive_properties(file_name, properties)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ return response(message=process["msg"])
- flash(process['msg'], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=process["msg"])
@APP.route("/config/save", methods=["POST"])
@@ -408,11 +447,9 @@ def config_save():
process = file_cmd.write_config(file_name)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ return response(message=process["msg"])
- flash(process['msg'], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=process["msg"])
@APP.route("/config/load", methods=["POST"])
@@ -427,24 +464,19 @@ def config_load():
process = file_cmd.read_config(file_name)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ return response(message=process["msg"])
+
+ return response(error=True, message=process["msg"])
- flash(process['msg'], "error")
- return redirect(url_for("index"))
if "delete" in request.form:
process = file_cmd.delete_file(f"{CFG_DIR}/{file_name}")
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ return response(message=process["msg"])
- flash(process['msg'], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=process["msg"])
- # The only reason we would reach here would be a Web UI bug. Will not localize.
- flash("Got an unhandled request (needs to be either load or delete)", "error")
- return redirect(url_for("index"))
+ return response(error=True, message="Action field (load, delete) missing")
@APP.route("/logs/show", methods=["POST"])
@@ -455,14 +487,16 @@ def show_logs():
lines = request.form.get("lines")
scope = request.form.get("scope")
+ # TODO: Render logs in a template (issue #836) and structured JSON
returncode, logs = sys_cmd.get_logs(lines, scope)
if returncode == 0:
headers = {"content-type": "text/plain"}
- return logs, int(lines), headers
+ return logs, headers
- flash(_("An error occurred when fetching logs."))
- flash(logs, "stderr")
- return redirect(url_for("index"))
+ return response(error=True, message=[
+ (_("An error occurred when fetching logs."), "error"),
+ (logs, "stderr"),
+ ])
@APP.route("/logs/level", methods=["POST"])
@@ -475,11 +509,9 @@ def log_level():
process = ractl_cmd.set_log_level(level)
if process["status"]:
- flash(_("Log level set to %(value)s", value=level))
- return redirect(url_for("index"))
+ return response(message=_("Log level set to %(value)s", value=level))
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=process["msg"])
@APP.route("/scsi/attach_device", methods=["POST"])
@@ -505,11 +537,13 @@ def attach_device():
error_msg = _("Please follow the instructions at %(url)s", url=error_url)
if "interface" in params.keys():
+ # Note: is_bridge_configured returns False if the bridge is configured
bridge_status = is_bridge_configured(params["interface"])
if bridge_status:
- flash(bridge_status, "error")
- flash(error_msg, "error")
- return redirect(url_for("index"))
+ return response(error=True, message=[
+ (bridge_status, "error"),
+ (error_msg, "error")
+ ])
kwargs = {
"unit": int(unit),
@@ -519,16 +553,14 @@ def attach_device():
process = ractl_cmd.attach_device(scsi_id, **kwargs)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(_(
+ return response(message=_(
"Attached %(device_type)s to SCSI ID %(id_number)s LUN %(unit_number)s",
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
))
- return redirect(url_for("index"))
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=process["msg"])
@APP.route("/scsi/attach", methods=["POST"])
@@ -557,8 +589,7 @@ def attach_image():
process = file_cmd.read_drive_properties(drive_properties)
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=process["msg"])
conf = process["conf"]
kwargs["vendor"] = conf["vendor"]
kwargs["product"] = conf["product"]
@@ -569,28 +600,31 @@ def attach_image():
process = ractl_cmd.attach_device(scsi_id, **kwargs)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(_(
+ response_messages = [(_(
"Attached %(file_name)s as %(device_type)s to "
"SCSI ID %(id_number)s LUN %(unit_number)s",
file_name=file_name,
device_type=get_device_name(device_type),
id_number=scsi_id,
unit_number=unit,
- ))
+ ), "success")]
+
if int(file_size) % int(expected_block_size):
- flash(_(
+ response_messages.append((_(
"The image file size %(file_size)s bytes is not a multiple of "
"%(block_size)s. RaSCSI will ignore the trailing data. "
"The image may be corrupted, so proceed with caution.",
file_size=file_size,
block_size=expected_block_size,
- ), "error")
- return redirect(url_for("index"))
+ ), "warning"))
- flash(_("Failed to attach %(file_name)s to SCSI ID %(id_number)s LUN %(unit_number)s",
- file_name=file_name, id_number=scsi_id, unit_number=unit), "error")
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(message=response_messages)
+
+ return response(error=True, message=[
+ (_("Failed to attach %(file_name)s to SCSI ID %(id_number)s LUN %(unit_number)s",
+ file_name=file_name, id_number=scsi_id, unit_number=unit), "error"),
+ (process["msg"], "error"),
+ ])
@APP.route("/scsi/detach_all", methods=["POST"])
@@ -601,11 +635,9 @@ def detach_all_devices():
"""
process = ractl_cmd.detach_all()
if process["status"]:
- flash(_("Detached all SCSI devices"))
- return redirect(url_for("index"))
+ return response(message=_("Detached all SCSI devices"))
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=process["msg"])
@APP.route("/scsi/detach", methods=["POST"])
@@ -618,14 +650,14 @@ def detach():
unit = request.form.get("unit")
process = ractl_cmd.detach_by_id(scsi_id, unit)
if process["status"]:
- flash(_("Detached SCSI ID %(id_number)s LUN %(unit_number)s",
- id_number=scsi_id, unit_number=unit))
- return redirect(url_for("index"))
+ return response(message=_("Detached SCSI ID %(id_number)s LUN %(unit_number)s",
+ id_number=scsi_id, unit_number=unit))
- flash(_("Failed to detach SCSI ID %(id_number)s LUN %(unit_number)s",
- id_number=scsi_id, unit_number=unit), "error")
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=[
+ (_("Failed to detach SCSI ID %(id_number)s LUN %(unit_number)s",
+ id_number=scsi_id, unit_number=unit), "error"),
+ (process["msg"], "error"),
+ ])
@APP.route("/scsi/eject", methods=["POST"])
@@ -639,14 +671,15 @@ def eject():
process = ractl_cmd.eject_by_id(scsi_id, unit)
if process["status"]:
- flash(_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s",
- id_number=scsi_id, unit_number=unit))
- return redirect(url_for("index"))
+ return response(message=_("Ejected SCSI ID %(id_number)s LUN %(unit_number)s",
+ id_number=scsi_id, unit_number=unit))
+
+ return response(error=True, message=[
+ (_("Failed to eject SCSI ID %(id_number)s LUN %(unit_number)s",
+ id_number=scsi_id, unit_number=unit), "error"),
+ (process["msg"], "error"),
+ ])
- flash(_("Failed to eject SCSI ID %(id_number)s LUN %(unit_number)s",
- id_number=scsi_id, unit_number=unit), "error")
- flash(process["msg"], "error")
- return redirect(url_for("index"))
@APP.route("/scsi/info", methods=["POST"])
def device_info():
@@ -660,29 +693,48 @@ def device_info():
# First check if any device at all was returned
if not devices["status"]:
- flash(devices["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=devices["msg"])
# Looking at the first dict in list to get
# the one and only device that should have been returned
device = devices["device_list"][0]
if str(device["id"]) == scsi_id:
- flash(_("DEVICE INFO"))
- flash("===========")
- flash(_("SCSI ID: %(id_number)s", id_number=device["id"]))
- flash(_("LUN: %(unit_number)s", unit_number=device["unit"]))
- flash(_("Type: %(device_type)s", device_type=device["device_type"]))
- flash(_("Status: %(device_status)s", device_status=device["status"]))
- flash(_("File: %(image_file)s", image_file=device["image"]))
- flash(_("Parameters: %(value)s", value=device["params"]))
- flash(_("Vendor: %(value)s", value=device["vendor"]))
- flash(_("Product: %(value)s", value=device["product"]))
- flash(_("Revision: %(revision_number)s", revision_number=device["revision"]))
- flash(_("Block Size: %(value)s bytes", value=device["block_size"]))
- flash(_("Image Size: %(value)s bytes", value=device["size"]))
- return redirect(url_for("index"))
+ # TODO: Move the device info to the template instead of a flash message
+ message = "\n".join([
+ _("DEVICE INFO"),
+ "===========",
+ _("SCSI ID: %(id_number)s", id_number=device["id"]),
+ _("LUN: %(unit_number)s", unit_number=device["unit"]),
+ _("Type: %(device_type)s", device_type=device["device_type"]),
+ _("Status: %(device_status)s", device_status=device["status"]),
+ _("File: %(image_file)s", image_file=device["image"]),
+ _("Parameters: %(value)s", value=device["params"]),
+ _("Vendor: %(value)s", value=device["vendor"]),
+ _("Product: %(value)s", value=device["product"]),
+ _("Revision: %(revision_number)s", revision_number=device["revision"]),
+ _("Block Size: %(value)s bytes", value=device["block_size"]),
+ _("Image Size: %(value)s bytes", value=device["size"]),
+ ])
+
+ # Don't send redundant "info" message with the JSON response
+ if request.headers.get("accept") == "application/json":
+ return response(device_info={
+ "scsi_id": device["id"],
+ "lun": device["unit"],
+ "device_type": device["device_type"],
+ "status": device["status"],
+ "file": device["image"],
+ "parameters": device["params"],
+ "vendor": device["vendor"],
+ "product": device["product"],
+ "revision": device["revision"],
+ "block_size": device["block_size"],
+ "size": device["size"],
+ })
+
+ return response(message=[(message, "info")])
+
+ return response(error=True, message=devices["msg"])
- flash(devices["msg"], "error")
- return redirect(url_for("index"))
@APP.route("/scsi/reserve", methods=["POST"])
@login_required
@@ -697,12 +749,13 @@ def reserve_id():
process = ractl_cmd.reserve_scsi_ids(reserved_ids)
if process["status"]:
RESERVATIONS[int(scsi_id)] = memo
- flash(_("Reserved SCSI ID %(id_number)s", id_number=scsi_id))
- return redirect(url_for("index"))
+ return response(message=_("Reserved SCSI ID %(id_number)s", id_number=scsi_id))
+
+ return response(error=True, message=[
+ (_("Failed to reserve SCSI ID %(id_number)s", id_number=scsi_id), "error"),
+ (process["msg"], "error"),
+ ])
- flash(_("Failed to reserve SCSI ID %(id_number)s", id_number=scsi_id))
- flash(process["msg"], "error")
- return redirect(url_for("index"))
@APP.route("/scsi/release", methods=["POST"])
@login_required
@@ -716,12 +769,12 @@ def release_id():
process = ractl_cmd.reserve_scsi_ids(reserved_ids)
if process["status"]:
RESERVATIONS[int(scsi_id)] = ""
- flash(_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id))
- return redirect(url_for("index"))
+ return response(message=_("Released the reservation for SCSI ID %(id_number)s", id_number=scsi_id))
- flash(_("Failed to release the reservation for SCSI ID %(id_number)s", id_number=scsi_id))
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=[
+ (_("Failed to release the reservation for SCSI ID %(id_number)s", id_number=scsi_id), "error"),
+ (process["msg"], "error"),
+ ])
@APP.route("/pi/reboot", methods=["POST"])
@@ -731,7 +784,7 @@ def restart():
Restarts the Pi
"""
ractl_cmd.shutdown_pi("reboot")
- return redirect(url_for("index"))
+ return response()
@APP.route("/pi/shutdown", methods=["POST"])
@@ -741,7 +794,7 @@ def shutdown():
Shuts down the Pi
"""
ractl_cmd.shutdown_pi("system")
- return redirect(url_for("index"))
+ return response()
@APP.route("/files/download_to_iso", methods=["POST"])
@@ -753,16 +806,18 @@ def download_to_iso():
scsi_id = request.form.get("scsi_id")
url = request.form.get("url")
iso_args = request.form.get("type").split()
+ response_messages = []
process = file_cmd.download_file_to_iso(url, *iso_args)
process = ReturnCodeMapper.add_msg(process)
- if process["status"]:
- flash(process["msg"])
- flash(_("Saved image as: %(file_name)s", file_name=process['file_name']))
- else:
- flash(_("Failed to create CD-ROM image from %(url)s", url=url), "error")
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ if not process["status"]:
+ return response(error=True, message=[
+ (_("Failed to create CD-ROM image from %(url)s", url=url), "error"),
+ (process["msg"], "error"),
+ ])
+
+ response_messages.append((process["msg"], "success"))
+ response_messages.append((_("Saved image as: %(file_name)s", file_name=process['file_name']), "success"))
process_attach = ractl_cmd.attach_device(
scsi_id,
@@ -771,13 +826,13 @@ def download_to_iso():
)
process_attach = ReturnCodeMapper.add_msg(process_attach)
if process_attach["status"]:
- flash(_("Attached to SCSI ID %(id_number)s", id_number=scsi_id))
- return redirect(url_for("index"))
+ response_messages.append((_("Attached to SCSI ID %(id_number)s", id_number=scsi_id), "success"))
+ return response(message=response_messages)
- flash(_("Failed to attach image to SCSI ID %(id_number)s. Try attaching it manually.",
- id_number=scsi_id), "error")
- flash(process_attach["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=[
+ (_("Failed to attach image to SCSI ID %(id_number)s. Try attaching it manually.", id_number=scsi_id), "error"),
+ (process_attach["msg"], "error"),
+ ])
@APP.route("/files/download_to_images", methods=["POST"])
@@ -791,12 +846,12 @@ def download_img():
process = file_cmd.download_to_dir(url, server_info["image_dir"], Path(url).name)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ return response(message=process["msg"])
- flash(_("Failed to download file from %(url)s", url=url), "error")
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=[
+ (_("Failed to download file from %(url)s", url=url), "error"),
+ (process["msg"], "error"),
+ ])
@APP.route("/files/download_to_afp", methods=["POST"])
@@ -810,12 +865,12 @@ def download_afp():
process = file_cmd.download_to_dir(url, AFP_DIR, file_name)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ return response(message=process["msg"])
- flash(_("Failed to download file from %(url)s", url=url), "error")
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=[
+ (_("Failed to download file from %(url)s", url=url), "error"),
+ (process["msg"], "error"),
+ ])
@APP.route("/files/upload", methods=["POST"])
@@ -846,11 +901,13 @@ def create_file():
process = file_cmd.create_new_image(file_name, file_type, size)
if process["status"]:
- flash(_("Image file created: %(file_name)s", file_name=full_file_name))
- return redirect(url_for("index"))
+ return response(
+ status_code=201,
+ message=_("Image file created: %(file_name)s", file_name=full_file_name),
+ image=full_file_name,
+ )
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ return response(error=True, message=process["msg"])
@APP.route("/files/download", methods=["POST"])
@@ -872,26 +929,23 @@ def delete():
file_name = request.form.get("file_name")
process = file_cmd.delete_image(file_name)
- if process["status"]:
- flash(_("Image file deleted: %(file_name)s", file_name=file_name))
- else:
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ if not process["status"]:
+ return response(error=True, message=process["msg"])
+
+ response_messages = [
+ (_("Image file deleted: %(file_name)s", file_name=file_name), "success")]
# Delete the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
if Path(prop_file_path).is_file():
process = file_cmd.delete_file(prop_file_path)
process = ReturnCodeMapper.add_msg(process)
-
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ response_messages.append((process["msg"], "success"))
+ else:
+ response_messages.append((process["msg"], "error"))
- flash(process["msg"], "error")
- return redirect(url_for("index"))
-
- return redirect(url_for("index"))
+ return response(message=response_messages)
@APP.route("/files/rename", methods=["POST"])
@@ -904,11 +958,11 @@ def rename():
new_file_name = request.form.get("new_file_name")
process = file_cmd.rename_image(file_name, new_file_name)
- if process["status"]:
- flash(_("Image file renamed to: %(file_name)s", file_name=new_file_name))
- else:
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ if not process["status"]:
+ return response(error=True, message=process["msg"])
+
+ response_messages = [
+ (_("Image file renamed to: %(file_name)s", file_name=new_file_name), "success")]
# Rename the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
@@ -917,13 +971,11 @@ def rename():
process = file_cmd.rename_file(prop_file_path, new_prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ response_messages.append((process["msg"], "success"))
+ else:
+ response_messages.append((process["msg"], "error"))
- flash(process["msg"], "error")
- return redirect(url_for("index"))
-
- return redirect(url_for("index"))
+ return response(message=response_messages)
@APP.route("/files/copy", methods=["POST"])
@@ -936,11 +988,11 @@ def copy():
new_file_name = request.form.get("copy_file_name")
process = file_cmd.copy_image(file_name, new_file_name)
- if process["status"]:
- flash(_("Copy of image file saved as: %(file_name)s", file_name=new_file_name))
- else:
- flash(process["msg"], "error")
- return redirect(url_for("index"))
+ if not process["status"]:
+ return response(error=True, message=process["msg"])
+
+ response_messages = [
+ (_("Copy of image file saved as: %(file_name)s", file_name=new_file_name), "success")]
# Create a copy of the drive properties file, if it exists
prop_file_path = f"{CFG_DIR}/{file_name}.{PROPERTIES_SUFFIX}"
@@ -949,13 +1001,11 @@ def copy():
process = file_cmd.copy_file(prop_file_path, new_prop_file_path)
process = ReturnCodeMapper.add_msg(process)
if process["status"]:
- flash(process["msg"])
- return redirect(url_for("index"))
+ response_messages.append((process["msg"], "success"))
+ else:
+ response_messages.append((process["msg"], "error"))
- flash(process["msg"], "error")
- return redirect(url_for("index"))
-
- return redirect(url_for("index"))
+ return response(message=response_messages)
@APP.route("/files/extract_image", methods=["POST"])
@@ -974,23 +1024,23 @@ def extract_image():
)
if extract_result["return_code"] == ReturnCodes.EXTRACTIMAGE_SUCCESS:
- flash(ReturnCodeMapper.add_msg(extract_result).get("msg"))
+ response_messages = [(ReturnCodeMapper.add_msg(extract_result).get("msg"), "success")]
for properties_file in extract_result["properties_files_moved"]:
if properties_file["status"]:
- flash(_("Properties file %(file)s moved to %(directory)s",
+ response_messages.append((_("Properties file %(file)s moved to %(directory)s",
file=properties_file['name'],
directory=CFG_DIR
- ))
+ ), "success"))
else:
- flash(_("Failed to move properties file %(file)s to %(directory)s",
+ response_messages.append((_("Failed to move properties file %(file)s to %(directory)s",
file=properties_file['name'],
directory=CFG_DIR
- ), "error")
- else:
- flash(ReturnCodeMapper.add_msg(extract_result).get("msg"), "error")
+ ), "error"))
- return redirect(url_for("index"))
+ return response(message=response_messages)
+
+ return response(error=True, message=ReturnCodeMapper.add_msg(extract_result).get("msg"))
@APP.route("/language", methods=["POST"])
@@ -1006,8 +1056,7 @@ def change_language():
language = Locale.parse(locale)
language_name = language.get_language_name(locale)
- flash(_("Changed Web Interface language to %(locale)s", locale=language_name))
- return redirect(url_for("index"))
+ return response(message=_("Changed Web Interface language to %(locale)s", locale=language_name))
@APP.before_first_request
diff --git a/python/web/start.sh b/python/web/start.sh
index f9e5b26b..c96d3ef3 100755
--- a/python/web/start.sh
+++ b/python/web/start.sh
@@ -62,8 +62,7 @@ if ! test -e venv; then
pip3 install wheel
pip3 install -r requirements.txt
- git rev-parse --is-inside-work-tree &> /dev/null
- if [[ $? -eq 0 ]]; then
+ if git rev-parse --is-inside-work-tree &> /dev/null; then
git rev-parse HEAD > current
fi
fi
diff --git a/python/web/tests/.gitkeep b/python/web/tests/.gitkeep
deleted file mode 100644
index e69de29b..00000000
diff --git a/python/web/tests/api/conftest.py b/python/web/tests/api/conftest.py
new file mode 100644
index 00000000..76735679
--- /dev/null
+++ b/python/web/tests/api/conftest.py
@@ -0,0 +1,78 @@
+import pytest
+import uuid
+
+CFG_DIR = "/home/pi/.config/rascsi"
+IMAGES_DIR = "/home/pi/images"
+AFP_DIR = "/home/pi/afpshare"
+SCSI_ID = 6
+FILE_SIZE_1_MIB = 1048576
+STATUS_SUCCESS = "success"
+STATUS_ERROR = "error"
+
+
+@pytest.fixture(scope="function")
+def create_test_image(request, http_client):
+ images = []
+
+ def create(image_type="hds", size=1, auto_delete=True):
+ file_prefix = str(uuid.uuid4())
+ file_name = f"{file_prefix}.{image_type}"
+
+ response = http_client.post(
+ "/files/create",
+ data={
+ "file_name": file_prefix,
+ "type": image_type,
+ "size": size,
+ },
+ )
+
+ if response.json()["status"] != STATUS_SUCCESS:
+ raise Exception("Failed to create temporary image")
+
+ if auto_delete:
+ images.append(file_name)
+
+ return file_name
+
+ def delete():
+ for image in images:
+ http_client.post("/files/delete", data={"file_name": image})
+
+ request.addfinalizer(delete)
+ return create
+
+
+@pytest.fixture(scope="function")
+def list_files(http_client):
+ def files():
+ return [f["name"] for f in http_client.get("/").json()["data"]["files"]]
+
+ return files
+
+
+@pytest.fixture(scope="function")
+def list_attached_images(http_client):
+ def files():
+ return http_client.get("/").json()["data"]["attached_images"]
+
+ return files
+
+
+@pytest.fixture(scope="function")
+def delete_file(http_client):
+ def delete(file_name):
+ http_client.post("/files/delete", data={"file_name": file_name})
+
+ return delete
+
+
+@pytest.fixture(scope="function")
+def detach_devices(http_client):
+ def detach():
+ response = http_client.post("/scsi/detach_all")
+ if response.json()["status"] == STATUS_SUCCESS:
+ return True
+ raise Exception("Failed to detach SCSI devices")
+
+ return detach
diff --git a/python/web/tests/api/test_auth.py b/python/web/tests/api/test_auth.py
new file mode 100644
index 00000000..3a720ea9
--- /dev/null
+++ b/python/web/tests/api/test_auth.py
@@ -0,0 +1,43 @@
+from conftest import STATUS_SUCCESS, STATUS_ERROR
+
+
+# route("/login", methods=["POST"])
+def test_login_with_valid_credentials(pytestconfig, http_client_unauthenticated):
+ response = http_client_unauthenticated.post(
+ "/login",
+ data={
+ "username": pytestconfig.getoption("rascsi_username"),
+ "password": pytestconfig.getoption("rascsi_password"),
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert "env" in response_data["data"]
+
+
+# route("/login", methods=["POST"])
+def test_login_with_invalid_credentials(http_client_unauthenticated):
+ response = http_client_unauthenticated.post(
+ "/login",
+ data={
+ "username": "__INVALID_USER__",
+ "password": "__INVALID_PASS__",
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 401
+ assert response_data["status"] == STATUS_ERROR
+ assert response_data["messages"][0]["message"] == (
+ "You must log in with valid credentials for a user in the 'rascsi' group"
+ )
+
+
+# route("/logout")
+def test_logout(http_client):
+ response = http_client.get("/logout")
+ assert response.status_code == 200
diff --git a/python/web/tests/api/test_devices.py b/python/web/tests/api/test_devices.py
new file mode 100644
index 00000000..640f20bd
--- /dev/null
+++ b/python/web/tests/api/test_devices.py
@@ -0,0 +1,227 @@
+import pytest
+
+from conftest import (
+ IMAGES_DIR,
+ SCSI_ID,
+ FILE_SIZE_1_MIB,
+ STATUS_SUCCESS,
+)
+
+
+# route("/scsi/attach", methods=["POST"])
+def test_attach_image(http_client, create_test_image, detach_devices):
+ test_image = create_test_image()
+
+ response = http_client.post(
+ "/scsi/attach",
+ data={
+ "file_name": test_image,
+ "file_size": FILE_SIZE_1_MIB,
+ "scsi_id": SCSI_ID,
+ "unit": 0,
+ "type": "SCHD",
+ },
+ )
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == (
+ f"Attached {test_image} as Hard Disk to SCSI ID {SCSI_ID} LUN 0"
+ )
+
+ # Cleanup
+ detach_devices()
+
+
+# route("/scsi/attach_device", methods=["POST"])
+@pytest.mark.parametrize(
+ "device_name,device_config",
+ [
+ # TODO: Fix networking in container, else SCBR attachment fails
+ # ("X68000 Host Bridge", {"type": "SCBR", "interface": "eth0", "inet": "10.10.20.1/24"}),
+ ("DaynaPORT SCSI/Link", {"type": "SCDP", "interface": "eth0", "inet": "10.10.20.1/24"}),
+ ("Host Services", {"type": "SCHS"}),
+ ("Printer", {"type": "SCLP", "timeout": 30, "cmd": "lp -oraw %f"}),
+ ],
+)
+def test_attach_device(http_client, detach_devices, device_name, device_config):
+ device_config["scsi_id"] = SCSI_ID
+ device_config["unit"] = 0
+
+ response = http_client.post(
+ "/scsi/attach_device",
+ data=device_config,
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == (
+ f"Attached {device_name} to SCSI ID {SCSI_ID} LUN 0"
+ )
+
+ # Cleanup
+ detach_devices()
+
+
+# route("/scsi/detach", methods=["POST"])
+def test_detach_device(http_client, create_test_image):
+ test_image = create_test_image()
+
+ http_client.post(
+ "/scsi/attach",
+ data={
+ "file_name": test_image,
+ "file_size": FILE_SIZE_1_MIB,
+ "scsi_id": SCSI_ID,
+ "unit": 0,
+ "type": "SCHD",
+ },
+ )
+
+ response = http_client.post(
+ "/scsi/detach",
+ data={
+ "scsi_id": SCSI_ID,
+ "unit": 0,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"Detached SCSI ID {SCSI_ID} LUN 0"
+
+
+# route("/scsi/detach_all", methods=["POST"])
+def test_detach_all_devices(http_client, create_test_image, list_attached_images):
+ test_images = []
+ scsi_ids = [4, 5, 6]
+
+ for scsi_id in scsi_ids:
+ test_image = create_test_image()
+ test_images.append(test_image)
+
+ http_client.post(
+ "/scsi/attach",
+ data={
+ "file_name": test_image,
+ "file_size": FILE_SIZE_1_MIB,
+ "scsi_id": scsi_id,
+ "unit": 0,
+ "type": "SCHD",
+ },
+ )
+
+ assert list_attached_images() == test_images
+
+ response = http_client.post("/scsi/detach_all")
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert list_attached_images() == []
+
+
+# route("/scsi/eject", methods=["POST"])
+def test_eject_device(http_client, create_test_image, detach_devices):
+ test_image = create_test_image()
+
+ http_client.post(
+ "/scsi/attach",
+ data={
+ "file_name": test_image,
+ "file_size": FILE_SIZE_1_MIB,
+ "scsi_id": SCSI_ID,
+ "unit": 0,
+ "type": "SCCD", # CD-ROM
+ },
+ )
+
+ response = http_client.post(
+ "/scsi/eject",
+ data={
+ "scsi_id": SCSI_ID,
+ "unit": 0,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"Ejected SCSI ID {SCSI_ID} LUN 0"
+
+ # Cleanup
+ detach_devices()
+
+
+# route("/scsi/info", methods=["POST"])
+def test_show_device_info(http_client, create_test_image, detach_devices):
+ test_image = create_test_image()
+
+ http_client.post(
+ "/scsi/attach",
+ data={
+ "file_name": test_image,
+ "file_size": FILE_SIZE_1_MIB,
+ "scsi_id": SCSI_ID,
+ "unit": 0,
+ "type": "SCHD",
+ },
+ )
+
+ response = http_client.post(
+ "/scsi/info",
+ data={
+ "scsi_id": SCSI_ID,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert "device_info" in response_data["data"]
+ assert response_data["data"]["device_info"]["file"] == f"{IMAGES_DIR}/{test_image}"
+
+ # Cleanup
+ detach_devices()
+
+
+# route("/scsi/reserve", methods=["POST"])
+# route("/scsi/release", methods=["POST"])
+def test_reserve_and_release_device(http_client):
+ scsi_id = 0
+
+ response = http_client.post(
+ "/scsi/reserve",
+ data={
+ "scsi_id": scsi_id,
+ "memo": "TEST",
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"Reserved SCSI ID {scsi_id}"
+
+ response = http_client.post(
+ "/scsi/release",
+ data={
+ "scsi_id": scsi_id,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == (
+ f"Released the reservation for SCSI ID {scsi_id}"
+ )
diff --git a/python/web/tests/api/test_files.py b/python/web/tests/api/test_files.py
new file mode 100644
index 00000000..cf0d96d9
--- /dev/null
+++ b/python/web/tests/api/test_files.py
@@ -0,0 +1,317 @@
+import pytest
+import uuid
+import os
+
+from conftest import (
+ IMAGES_DIR,
+ AFP_DIR,
+ SCSI_ID,
+ FILE_SIZE_1_MIB,
+ STATUS_SUCCESS,
+)
+
+
+# route("/files/create", methods=["POST"])
+def test_create_file(http_client, list_files, delete_file):
+ file_prefix = str(uuid.uuid4())
+ file_name = f"{file_prefix}.hds"
+
+ response = http_client.post(
+ "/files/create",
+ data={
+ "file_name": file_prefix,
+ "type": "hds",
+ "size": 1,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 201
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["data"]["image"] == file_name
+ assert response_data["messages"][0]["message"] == f"Image file created: {file_name}"
+ assert file_name in list_files()
+
+ # Cleanup
+ delete_file(file_name)
+
+
+# route("/files/rename", methods=["POST"])
+def test_rename_file(http_client, create_test_image, list_files, delete_file):
+ original_file = create_test_image(auto_delete=False)
+ renamed_file = f"{uuid.uuid4()}.rename"
+
+ response = http_client.post(
+ "/files/rename",
+ data={"file_name": original_file, "new_file_name": renamed_file},
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"Image file renamed to: {renamed_file}"
+ assert renamed_file in list_files()
+
+ # Cleanup
+ delete_file(renamed_file)
+
+
+# route("/files/copy", methods=["POST"])
+def test_copy_file(http_client, create_test_image, list_files, delete_file):
+ original_file = create_test_image()
+ copy_file = f"{uuid.uuid4()}.copy"
+
+ response = http_client.post(
+ "/files/copy",
+ data={
+ "file_name": original_file,
+ "copy_file_name": copy_file,
+ },
+ )
+
+ response_data = response.json()
+ files = list_files()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"Copy of image file saved as: {copy_file}"
+ assert original_file in files
+ assert copy_file in files
+
+ # Cleanup
+ delete_file(copy_file)
+
+
+# route("/files/delete", methods=["POST"])
+def test_delete_file(http_client, create_test_image, list_files):
+ file_name = create_test_image()
+
+ response = http_client.post("/files/delete", data={"file_name": file_name})
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"Image file deleted: {file_name}"
+ assert file_name not in list_files()
+
+
+# route("/files/extract_image", methods=["POST"])
+@pytest.mark.parametrize(
+ "archive_file_name,image_file_name",
+ [
+ ("test_image.zip", "test_image_from_zip.hds"),
+ ("test_image.sit", "test_image_from_sit.hds"),
+ ("test_image.7z", "test_image_from_7z.hds"),
+ ],
+)
+def test_extract_file(
+ httpserver, http_client, list_files, delete_file, archive_file_name, image_file_name
+):
+ http_path = f"/images/{archive_file_name}"
+ url = httpserver.url_for(http_path)
+
+ with open(f"tests/assets/{archive_file_name}", mode="rb") as file:
+ zip_file_data = file.read()
+
+ httpserver.expect_request(http_path).respond_with_data(
+ zip_file_data,
+ mimetype="application/octet-stream",
+ )
+
+ http_client.post(
+ "/files/download_to_images",
+ data={
+ "url": url,
+ },
+ )
+
+ response = http_client.post(
+ "/files/extract_image",
+ data={
+ "archive_file": archive_file_name,
+ "archive_members": image_file_name,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == "Extracted 1 file(s)"
+ assert image_file_name in list_files()
+
+ # Cleanup
+ delete_file(archive_file_name)
+ delete_file(image_file_name)
+
+
+# route("/files/upload", methods=["POST"])
+def test_upload_file(http_client, delete_file):
+ file_name = f"{uuid.uuid4()}.test"
+
+ with open("tests/assets/test_image.hds", mode="rb") as file:
+ file.seek(0, os.SEEK_END)
+ file_size = file.tell()
+ file.seek(0, 0)
+
+ number_of_chunks = 4
+
+ # Note: The test file needs to be cleanly divisible by the chunk size
+ chunk_size = int(file_size / number_of_chunks)
+
+ for chunk_number in range(0, 4):
+ if chunk_number == 0:
+ chunk_byte_offset = 0
+ else:
+ chunk_byte_offset = chunk_number * chunk_size
+
+ form_data = {
+ "dzuuid": str(uuid.uuid4()),
+ "dzchunkindex": chunk_number,
+ "dzchunksize": chunk_size,
+ "dzchunkbyteoffset": chunk_byte_offset,
+ "dztotalfilesize": file_size,
+ "dztotalchunkcount": number_of_chunks,
+ }
+
+ file_data = {"file": (file_name, file.read(chunk_size))}
+
+ response = http_client.post(
+ "/files/upload",
+ data=form_data,
+ files=file_data,
+ )
+
+ assert response.status_code == 200
+ assert response.text == "File upload successful!"
+
+ file = [f for f in http_client.get("/").json()["data"]["files"] if f["name"] == file_name][0]
+
+ assert file["size"] == file_size
+
+ # Cleanup
+ delete_file(file_name)
+
+
+# route("/files/download", methods=["POST"])
+def test_download_file(http_client, create_test_image):
+ file_name = create_test_image()
+
+ response = http_client.post("/files/download", data={"file": f"{IMAGES_DIR}/{file_name}"})
+
+ assert response.status_code == 200
+ assert response.headers["content-type"] == "application/octet-stream"
+ assert response.headers["content-disposition"] == f"attachment; filename={file_name}"
+ assert response.headers["content-length"] == str(FILE_SIZE_1_MIB)
+
+
+# route("/files/download_to_afp", methods=["POST"])
+def test_download_url_to_afp_dir(httpserver, http_client):
+ file_name = str(uuid.uuid4())
+ http_path = f"/images/{file_name}"
+ url = httpserver.url_for(http_path)
+
+ with open("tests/assets/test_image.hds", mode="rb") as file:
+ file_data = file.read()
+
+ httpserver.expect_request(http_path).respond_with_data(
+ file_data,
+ mimetype="application/octet-stream",
+ )
+
+ response = http_client.post(
+ "/files/download_to_afp",
+ data={
+ "url": url,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {AFP_DIR}"
+
+
+# route("/files/download_to_images", methods=["POST"])
+def test_download_url_to_images_dir(httpserver, http_client, list_files, delete_file):
+ file_name = str(uuid.uuid4())
+ http_path = f"/images/{file_name}"
+ url = httpserver.url_for(http_path)
+
+ with open("tests/assets/test_image.hds", mode="rb") as file:
+ test_file_data = file.read()
+
+ httpserver.expect_request(http_path).respond_with_data(
+ test_file_data,
+ mimetype="application/octet-stream",
+ )
+
+ response = http_client.post(
+ "/files/download_to_images",
+ data={
+ "url": url,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert file_name in list_files()
+ assert response_data["messages"][0]["message"] == f"{file_name} downloaded to {IMAGES_DIR}"
+
+ # Cleanup
+ delete_file(file_name)
+
+
+# route("/files/download_to_iso", methods=["POST"])
+def test_download_url_to_iso(
+ httpserver,
+ http_client,
+ list_files,
+ list_attached_images,
+ detach_devices,
+ delete_file,
+):
+ test_file_name = str(uuid.uuid4())
+ iso_file_name = f"{test_file_name}.iso"
+
+ http_path = f"/images/{test_file_name}"
+ url = httpserver.url_for(http_path)
+
+ with open("tests/assets/test_image.hds", mode="rb") as file:
+ test_file_data = file.read()
+
+ httpserver.expect_request(http_path).respond_with_data(
+ test_file_data,
+ mimetype="application/octet-stream",
+ )
+
+ response = http_client.post(
+ "/files/download_to_iso",
+ data={
+ "scsi_id": SCSI_ID,
+ "type": "-hfs",
+ "url": url,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert iso_file_name in list_files()
+ assert iso_file_name in list_attached_images()
+
+ m = response_data["messages"]
+ assert m[0]["message"] == 'Created CD-ROM ISO image with arguments "-hfs"'
+ assert m[1]["message"] == f"Saved image as: {IMAGES_DIR}/{iso_file_name}"
+ assert m[2]["message"] == f"Attached to SCSI ID {SCSI_ID}"
+
+ # Cleanup
+ detach_devices()
+ delete_file(iso_file_name)
diff --git a/python/web/tests/api/test_misc.py b/python/web/tests/api/test_misc.py
new file mode 100644
index 00000000..5a4f6f45
--- /dev/null
+++ b/python/web/tests/api/test_misc.py
@@ -0,0 +1,99 @@
+import uuid
+
+from conftest import (
+ CFG_DIR,
+ FILE_SIZE_1_MIB,
+ STATUS_SUCCESS,
+)
+
+
+# route("/")
+def test_index(http_client):
+ response = http_client.get("/")
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert "devices" in response_data["data"]
+
+
+# route("/env")
+def test_get_env_info(http_client):
+ response = http_client.get("/env")
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert "running_env" in response_data["data"]
+
+
+# route("/pwa/")
+def test_pwa_route(http_client):
+ response = http_client.get("/pwa/favicon.ico")
+
+ assert response.status_code == 200
+ assert response.headers["content-disposition"] == "inline; filename=favicon.ico"
+
+
+# route("/drive/list", methods=["GET"])
+def test_show_named_drive_presets(http_client):
+ response = http_client.get("/drive/list")
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert "cd_conf" in response_data["data"]
+ assert "hd_conf" in response_data["data"]
+ assert "rm_conf" in response_data["data"]
+
+
+# route("/drive/cdrom", methods=["POST"])
+def test_create_cdrom_properties_file(http_client):
+ file_name = f"{uuid.uuid4()}.iso"
+
+ response = http_client.post(
+ "/drive/cdrom",
+ data={
+ "vendor": "TEST_AAA",
+ "product": "TEST_BBB",
+ "revision": "1.0A",
+ "block_size": 2048,
+ "file_name": file_name,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == (
+ f"File created: {CFG_DIR}/{file_name}.properties"
+ )
+
+
+# route("/drive/create", methods=["POST"])
+def test_create_image_with_properties_file(http_client, delete_file):
+ file_prefix = str(uuid.uuid4())
+ file_name = f"{file_prefix}.hds"
+
+ response = http_client.post(
+ "/drive/create",
+ data={
+ "vendor": "TEST_AAA",
+ "product": "TEST_BBB",
+ "revision": "1.0A",
+ "block_size": 512,
+ "size": FILE_SIZE_1_MIB,
+ "file_type": "hds",
+ "file_name": file_prefix,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"Image file created: {file_name}"
+
+ # Cleanup
+ delete_file(file_name)
diff --git a/python/web/tests/api/test_settings.py b/python/web/tests/api/test_settings.py
new file mode 100644
index 00000000..9e7c276a
--- /dev/null
+++ b/python/web/tests/api/test_settings.py
@@ -0,0 +1,149 @@
+import pytest
+import uuid
+
+from conftest import CFG_DIR, STATUS_SUCCESS
+
+
+# route("/language", methods=["POST"])
+@pytest.mark.parametrize(
+ "locale,confirm_message",
+ [
+ ("de", "Webinterface-Sprache auf Deutsch geändert"),
+ ("es", "Se ha cambiado el lenguaje de la Interfaz Web a español"),
+ ("fr", "Langue de l’interface web changée pour français"),
+ ("sv", "Bytte webbgränssnittets språk till svenska"),
+ ("en", "Changed Web Interface language to English"),
+ ],
+)
+def test_set_language(http_client, locale, confirm_message):
+ response = http_client.post(
+ "/language",
+ data={
+ "locale": locale,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == confirm_message
+
+
+# route("/logs/level", methods=["POST"])
+@pytest.mark.parametrize("level", ["trace", "debug", "info", "warn", "err", "critical", "off"])
+def test_set_log_level(http_client, level):
+ response = http_client.post(
+ "/logs/level",
+ data={
+ "level": level,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == f"Log level set to {level}"
+
+ # Cleanup
+ http_client.post(
+ "/logs/level",
+ data={
+ "level": "debug",
+ },
+ )
+
+
+# route("/logs/show", methods=["POST"])
+def test_show_logs(http_client):
+ response = http_client.post(
+ "/logs/show",
+ data={
+ "lines": 100,
+ "scope": "",
+ },
+ )
+
+ assert response.status_code == 200
+ assert response.headers["content-type"] == "text/plain"
+
+
+# route("/config/save", methods=["POST"])
+# route("/config/load", methods=["POST"])
+def test_save_load_and_delete_configs(http_client):
+ config_name = str(uuid.uuid4())
+ config_json_file = f"{config_name}.json"
+ reserved_scsi_id = 0
+ reservation_memo = str(uuid.uuid4())
+
+ # Confirm the initial state
+ assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == ""
+
+ # Save the initial state to a config
+ response = http_client.post(
+ "/config/save",
+ data={
+ "name": config_name,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == (
+ f"File created: {CFG_DIR}/{config_json_file}"
+ )
+
+ assert config_json_file in http_client.get("/").json()["data"]["config_files"]
+
+ # Modify the state
+ http_client.post(
+ "/scsi/reserve",
+ data={
+ "scsi_id": reserved_scsi_id,
+ "memo": reservation_memo,
+ },
+ )
+
+ assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == reservation_memo
+
+ # Load the saved config
+ response = http_client.post(
+ "/config/load",
+ data={
+ "name": config_json_file,
+ "load": True,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == (
+ f"Loaded configurations from: {CFG_DIR}/{config_json_file}"
+ )
+
+ # Confirm the application has returned to its initial state
+ assert http_client.get("/").json()["data"]["RESERVATIONS"][0] == ""
+
+ # Delete the saved config
+ response = http_client.post(
+ "/config/load",
+ data={
+ "name": config_json_file,
+ "delete": True,
+ },
+ )
+
+ response_data = response.json()
+
+ assert response.status_code == 200
+ assert response_data["status"] == STATUS_SUCCESS
+ assert response_data["messages"][0]["message"] == (
+ f"File deleted: {CFG_DIR}/{config_json_file}"
+ )
+
+ assert config_json_file not in http_client.get("/").json()["data"]["config_files"]
diff --git a/python/web/tests/assets/test_image.7z b/python/web/tests/assets/test_image.7z
new file mode 100644
index 00000000..f9af5b7a
Binary files /dev/null and b/python/web/tests/assets/test_image.7z differ
diff --git a/python/web/tests/assets/test_image.hds b/python/web/tests/assets/test_image.hds
new file mode 100644
index 00000000..9e0f96a2
Binary files /dev/null and b/python/web/tests/assets/test_image.hds differ
diff --git a/python/web/tests/assets/test_image.sit b/python/web/tests/assets/test_image.sit
new file mode 100644
index 00000000..a0d7c06a
Binary files /dev/null and b/python/web/tests/assets/test_image.sit differ
diff --git a/python/web/tests/assets/test_image.zip b/python/web/tests/assets/test_image.zip
new file mode 100644
index 00000000..9d85ff22
Binary files /dev/null and b/python/web/tests/assets/test_image.zip differ
diff --git a/python/web/tests/conftest.py b/python/web/tests/conftest.py
new file mode 100644
index 00000000..da10c3f7
--- /dev/null
+++ b/python/web/tests/conftest.py
@@ -0,0 +1,66 @@
+import pytest
+import requests
+
+
+def pytest_addoption(parser):
+ parser.addoption("--base_url", action="store", default="http://localhost:8080")
+ parser.addoption("--httpserver_host", action="store", default="host.docker.internal")
+ parser.addoption("--httpserver_listen_address", action="store", default="127.0.0.1")
+ parser.addoption("--rascsi_username", action="store", default="pi")
+ parser.addoption("--rascsi_password", action="store", default="rascsi")
+
+
+@pytest.fixture(scope="session")
+def httpserver_listen_address(pytestconfig):
+ return (pytestconfig.getoption("httpserver_listen_address"), 0)
+
+
+@pytest.fixture(scope="function", autouse=True)
+def set_httpserver_hostname(pytestconfig, httpserver):
+ # The HTTP requests are made by Python from within the container so we need
+ # httpserver.url_for to generate URLs which point to the Docker host
+ httpserver.host = pytestconfig.getoption("httpserver_host")
+
+
+@pytest.fixture(scope="session", autouse=True)
+def ensure_all_devices_detached(create_http_client):
+ http_client = create_http_client()
+ http_client.post("/scsi/detach_all")
+
+
+@pytest.fixture(scope="session")
+def create_http_client(pytestconfig):
+ def create(authenticate=True):
+ session = requests.Session()
+ session.headers.update({"Accept": "application/json"})
+ session.original_request = session.request
+
+ def relative_request(method, url, *args, **kwargs):
+ if url[:4] != "http":
+ url = pytestconfig.getoption("base_url") + url
+
+ return session.original_request(method, url, *args, **kwargs)
+
+ session.request = relative_request
+
+ if authenticate:
+ session.post(
+ "/login",
+ data={
+ "username": pytestconfig.getoption("rascsi_username"),
+ "password": pytestconfig.getoption("rascsi_password"),
+ },
+ )
+ return session
+
+ return create
+
+
+@pytest.fixture(scope="function")
+def http_client(create_http_client):
+ return create_http_client(authenticate=True)
+
+
+@pytest.fixture(scope="function")
+def http_client_unauthenticated(create_http_client):
+ return create_http_client(authenticate=False)