mirror of
https://github.com/akuker/RASCSI.git
synced 2025-01-02 09:35:58 +00:00
Enable unzipping of individual archive members (#385)
* Enable unzipping of individual archive members * Add code comments
This commit is contained in:
parent
3546764722
commit
28de92a00c
@ -65,6 +65,8 @@ def list_images():
|
|||||||
prop_data = list_files(PROPERTIES_SUFFIX, cfg_dir)
|
prop_data = list_files(PROPERTIES_SUFFIX, cfg_dir)
|
||||||
prop_files = [PurePath(x[0]).stem for x in prop_data]
|
prop_files = [PurePath(x[0]).stem for x in prop_data]
|
||||||
|
|
||||||
|
from zipfile import ZipFile, is_zipfile
|
||||||
|
server_info = get_server_info()
|
||||||
files = []
|
files = []
|
||||||
for f in result.image_files_info.image_files:
|
for f in result.image_files_info.image_files:
|
||||||
# Add properties meta data for the image, if applicable
|
# Add properties meta data for the image, if applicable
|
||||||
@ -73,6 +75,22 @@ def list_images():
|
|||||||
prop = process["conf"]
|
prop = process["conf"]
|
||||||
else:
|
else:
|
||||||
prop = False
|
prop = False
|
||||||
|
if f.name.lower().endswith(".zip"):
|
||||||
|
zip_path = f"{server_info['image_dir']}/{f.name}"
|
||||||
|
if is_zipfile(zip_path):
|
||||||
|
zip = ZipFile(zip_path)
|
||||||
|
# Get a list of str containing all zipfile members
|
||||||
|
zip_members = zip.namelist()
|
||||||
|
# Strip out directories from the list
|
||||||
|
zip_members = [x for x in zip_members if not x.endswith("/")]
|
||||||
|
# Reduce members to file names only (strip out full path)
|
||||||
|
zip_members = [PurePath(x).name for x in zip_members]
|
||||||
|
else:
|
||||||
|
logging.warning(f"{zip_path} is an invalid zip file")
|
||||||
|
zip_members = False
|
||||||
|
else:
|
||||||
|
zip_members = False
|
||||||
|
|
||||||
size_mb = "{:,.1f}".format(f.size / 1024 / 1024)
|
size_mb = "{:,.1f}".format(f.size / 1024 / 1024)
|
||||||
dtype = proto.PbDeviceType.Name(f.type)
|
dtype = proto.PbDeviceType.Name(f.type)
|
||||||
files.append(
|
files.append(
|
||||||
@ -82,6 +100,7 @@ def list_images():
|
|||||||
"size_mb": size_mb,
|
"size_mb": size_mb,
|
||||||
"detected_type": dtype,
|
"detected_type": dtype,
|
||||||
"prop": prop,
|
"prop": prop,
|
||||||
|
"zip": zip_members,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -136,18 +155,24 @@ def delete_file(file_path):
|
|||||||
return {"status": False, "msg": "Could not delete file"}
|
return {"status": False, "msg": "Could not delete file"}
|
||||||
|
|
||||||
|
|
||||||
def unzip_file(file_name):
|
def unzip_file(file_name, member=False):
|
||||||
"""
|
"""
|
||||||
Takes (str) file_name
|
Takes (str) file_name, optional (str) member
|
||||||
Returns dict with (boolean) status and (list of str) msg
|
Returns dict with (boolean) status and (list of str) msg
|
||||||
"""
|
"""
|
||||||
from subprocess import run
|
from subprocess import run
|
||||||
server_info = get_server_info()
|
server_info = get_server_info()
|
||||||
|
|
||||||
|
if member == False:
|
||||||
unzip_proc = run(
|
unzip_proc = run(
|
||||||
["unzip", "-d", server_info["image_dir"], "-n", "-j", \
|
["unzip", "-d", server_info["image_dir"], "-n", "-j", \
|
||||||
f"{server_info['image_dir']}/{file_name}"], capture_output=True
|
f"{server_info['image_dir']}/{file_name}"], capture_output=True
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
unzip_proc = run(
|
||||||
|
["unzip", "-d", server_info["image_dir"], "-n", "-j", \
|
||||||
|
f"{server_info['image_dir']}/{file_name}", member], capture_output=True
|
||||||
|
)
|
||||||
if unzip_proc.returncode != 0:
|
if unzip_proc.returncode != 0:
|
||||||
stderr = unzip_proc.stderr.decode("utf-8")
|
stderr = unzip_proc.stderr.decode("utf-8")
|
||||||
logging.warning(f"Unzipping failed: {stderr}")
|
logging.warning(f"Unzipping failed: {stderr}")
|
||||||
|
@ -140,6 +140,24 @@
|
|||||||
</ul>
|
</ul>
|
||||||
</details>
|
</details>
|
||||||
</td>
|
</td>
|
||||||
|
{% elif file["zip"] %}
|
||||||
|
<td>
|
||||||
|
<details>
|
||||||
|
<summary>{{file["name"]}}</summary>
|
||||||
|
<ul>
|
||||||
|
{% for member in file["zip"] %}
|
||||||
|
<li>
|
||||||
|
<label for="member">{{member}}</label>
|
||||||
|
<form action="/files/unzip" method="post">
|
||||||
|
<input type="hidden" name="image" value="{{file['name']}}">
|
||||||
|
<input type="hidden" name="member" value="{{member}}">
|
||||||
|
<input type="submit" value="Unzip" />
|
||||||
|
</form>
|
||||||
|
</li>
|
||||||
|
{% endfor %}
|
||||||
|
</ul>
|
||||||
|
</details>
|
||||||
|
</td>
|
||||||
{% else %}
|
{% else %}
|
||||||
<td>{{file["name"]}}</td>
|
<td>{{file["name"]}}</td>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
@ -156,7 +174,7 @@
|
|||||||
{% if file["name"].lower().endswith(archive_file_suffix) %}
|
{% if file["name"].lower().endswith(archive_file_suffix) %}
|
||||||
<form action="/files/unzip" method="post">
|
<form action="/files/unzip" method="post">
|
||||||
<input type="hidden" name="image" value="{{file["name"]}}">
|
<input type="hidden" name="image" value="{{file["name"]}}">
|
||||||
<input type="submit" value="Unzip" />
|
<input type="submit" value="Unzip All" />
|
||||||
</form>
|
</form>
|
||||||
{% else %}
|
{% else %}
|
||||||
<form action="/scsi/attach" method="post">
|
<form action="/scsi/attach" method="post">
|
||||||
|
@ -645,8 +645,9 @@ def delete():
|
|||||||
@app.route("/files/unzip", methods=["POST"])
|
@app.route("/files/unzip", methods=["POST"])
|
||||||
def unzip():
|
def unzip():
|
||||||
image = request.form.get("image")
|
image = request.form.get("image")
|
||||||
|
member = request.form.get("member") or False
|
||||||
|
|
||||||
process = unzip_file(image)
|
process = unzip_file(image, member)
|
||||||
if process["status"]:
|
if process["status"]:
|
||||||
if len(process["msg"]) < 1:
|
if len(process["msg"]) < 1:
|
||||||
flash("Aborted unzip: File(s) with the same name already exists.", "error")
|
flash("Aborted unzip: File(s) with the same name already exists.", "error")
|
||||||
|
Loading…
Reference in New Issue
Block a user