Allow images to be extracted from StuffIt, tarball, gzip, and 7z archives

This commit is contained in:
nucleogenic
2022-08-12 04:19:02 +01:00
parent a523e1febe
commit b5e6438a45
13 changed files with 438 additions and 127 deletions
@@ -12,6 +12,15 @@ CONFIG_FILE_SUFFIX = "json"
# File ending used for drive properties files
PROPERTIES_SUFFIX = "properties"
# Supported archive file suffixes
ARCHIVE_FILE_SUFFIXES = [
"zip",
"sit",
"tar",
"gz",
"7z"
]
# The RESERVATIONS list is used to keep track of the reserved ID memos.
# Initialize with a list of 8 empty strings.
RESERVATIONS = ["" for x in range(0, 8)]
+101 -66
View File
@@ -5,9 +5,9 @@ Module for methods reading from and writing to the file system
import os
import logging
import asyncio
from functools import lru_cache
from pathlib import PurePath
from zipfile import ZipFile, is_zipfile
from re import escape, findall
from time import time
from subprocess import run, CalledProcessError
from json import dump, load
@@ -16,10 +16,11 @@ from shutil import copyfile
import requests
import rascsi_interface_pb2 as proto
from rascsi.common_settings import CFG_DIR, CONFIG_FILE_SUFFIX, PROPERTIES_SUFFIX, RESERVATIONS
from rascsi.common_settings import CFG_DIR, CONFIG_FILE_SUFFIX, PROPERTIES_SUFFIX, ARCHIVE_FILE_SUFFIXES, RESERVATIONS
from rascsi.ractl_cmds import RaCtlCmds
from rascsi.return_codes import ReturnCodes
from rascsi.socket_cmds import SocketCmds
from util import unarchiver
class FileCmds:
@@ -97,19 +98,31 @@ class FileCmds:
prop = process["conf"]
else:
prop = False
if file.name.lower().endswith(".zip"):
zip_path = f"{server_info['image_dir']}/{file.name}"
if is_zipfile(zip_path):
zipfile = ZipFile(zip_path)
# Get a list of (str) containing all zipfile members
zip_members = zipfile.namelist()
# Strip out directories from the list
zip_members = [x for x in zip_members if not x.endswith("/")]
else:
logging.warning("%s is an invalid zip file", zip_path)
zip_members = False
else:
zip_members = False
archive_contents = []
if PurePath(file.name).suffix.lower()[1:] in ARCHIVE_FILE_SUFFIXES:
try:
archive_info = self._get_archive_info(
f"{server_info['image_dir']}/{file.name}",
_cache_extra_key=file.size
)
properties_files = [x["path"]
for x in archive_info["members"]
if x["path"].endswith(PROPERTIES_SUFFIX)]
for member in archive_info["members"]:
if member["is_dir"] or member["is_resource_fork"]:
continue
if PurePath(member["path"]).suffix.lower()[1:] == PROPERTIES_SUFFIX:
member["is_properties_file"] = True
elif f"{member['path']}.{PROPERTIES_SUFFIX}" in properties_files:
member["related_properties_file"] = f"{member['path']}.{PROPERTIES_SUFFIX}"
archive_contents.append(member)
except (unarchiver.LsarCommandError, unarchiver.LsarOutputError):
pass
size_mb = "{:,.1f}".format(file.size / 1024 / 1024)
dtype = proto.PbDeviceType.Name(file.type)
@@ -119,7 +132,7 @@ class FileCmds:
"size_mb": size_mb,
"detected_type": dtype,
"prop": prop,
"zip_members": zip_members,
"archive_contents": archive_contents,
})
return {"status": result.status, "msg": result.msg, "files": files}
@@ -266,62 +279,73 @@ class FileCmds:
"parameters": parameters,
}
def unzip_file(self, file_name, member=False, members=False):
def extract_image(self, file_path, members=None, move_properties_files_to_config=True):
"""
Takes (str) file_name, optional (str) member, optional (list) of (str) members
file_name is the name of the zip file to unzip
member is the full path to the particular file in the zip file to unzip
members contains all of the full paths to each of the zip archive members
Returns (dict) with (boolean) status and (list of str) msg
Takes (str) file_path, (list) members, optional (bool) move_properties_files_to_config
file_name is the path of the archive file to extract, relative to the images directory
members is a list of file paths in the archive file to extract
move_properties_files_to_config controls if .properties files are auto-moved to CFG_DIR
Returns (dict) result
"""
server_info = self.ractl.get_server_info()
prop_flag = False
if not member:
unzip_proc = asyncio.run(self.run_async("unzip", [
"-d",
server_info['image_dir'],
"-n",
"-j",
f"{server_info['image_dir']}/{file_name}",
]))
if members:
for path in members:
if path.endswith(PROPERTIES_SUFFIX):
name = PurePath(path).name
self.rename_file(f"{server_info['image_dir']}/{name}", f"{CFG_DIR}/{name}")
prop_flag = True
else:
member = escape(member)
unzip_proc = asyncio.run(self.run_async("unzip", [
"-d",
server_info['image_dir'],
"-n",
"-j",
f"{server_info['image_dir']}/{file_name}",
member,
]))
# Attempt to unzip a properties file in the same archive dir
unzip_prop = asyncio.run(self.run_async("unzip", [
"-d",
CFG_DIR,
"-n",
"-j",
f"{server_info['image_dir']}/{file_name}",
f"{member}.{PROPERTIES_SUFFIX}",
]))
if not members:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_NO_FILES_SPECIFIED,
}
if unzip_prop["returncode"] == 0:
prop_flag = True
if unzip_proc["returncode"] != 0:
logging.warning("Unzipping failed: %s", unzip_proc["stderr"])
return {"status": False, "msg": unzip_proc["stderr"]}
try:
extract_result = unarchiver.extract_archive(
f"{server_info['image_dir']}/{file_path}",
members=members,
output_dir=server_info["image_dir"],
)
properties_files_moved = []
if move_properties_files_to_config:
for file in extract_result["extracted"]:
if file.get("name").endswith(".properties"):
if (self.rename_file(
file["absolute_path"],
f"{CFG_DIR}/{file['name']}"
)):
properties_files_moved.append({
"status": True,
"name": file["path"],
"path": f"{CFG_DIR}/{file['name']}",
})
else:
properties_files_moved.append({
"status": False,
"name": file["path"],
"path": f"{CFG_DIR}/{file['name']}",
})
return {
"status": True,
"return_code": ReturnCodes.EXTRACTIMAGE_SUCCESS,
"parameters": {
"count": len(extract_result["extracted"]),
},
"extracted": extract_result["extracted"],
"skipped": extract_result["skipped"],
"properties_files_moved": properties_files_moved,
}
except unarchiver.UnarNoFilesExtractedError:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_NO_FILES_EXTRACTED,
}
except (unarchiver.UnarCommandError, unarchiver.UnarUnexpectedOutputError) as error:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_COMMAND_ERROR,
"parameters": {
"error": error,
}
}
unzipped = findall(
"(?:inflating|extracting):(.+)\n",
unzip_proc["stdout"]
)
return {"status": True, "msg": unzipped, "prop_flag": prop_flag}
def download_file_to_iso(self, url, *iso_args):
"""
@@ -652,3 +676,14 @@ class FileCmds:
logging.info("stderr: %s", stderr)
return {"returncode": proc.returncode, "stdout": stdout, "stderr": stderr}
# noinspection PyMethodMayBeStatic
@lru_cache(maxsize=32)
def _get_archive_info(self, file_path, **kwargs):
"""
Cached wrapper method to improve performance, e.g. on index screen
"""
try:
return unarchiver.inspect_archive(file_path)
except (unarchiver.LsarCommandError, unarchiver.LsarOutputError):
raise
+4
View File
@@ -20,3 +20,7 @@ class ReturnCodes:
READDRIVEPROPS_SUCCESS = 70
READDRIVEPROPS_COULD_NOT_READ = 71
ATTACHIMAGE_COULD_NOT_ATTACH = 80
EXTRACTIMAGE_SUCCESS = 90
EXTRACTIMAGE_NO_FILES_SPECIFIED = 91
EXTRACTIMAGE_NO_FILES_EXTRACTED = 92
EXTRACTIMAGE_COMMAND_ERROR = 93
View File
+45
View File
@@ -0,0 +1,45 @@
"""
Utility module for running system commands with basic logging
"""
import asyncio
import logging
import os
def run(program, args=None):
""" Run a command and return its output """
return asyncio.run(run_async(program, args))
async def run_async(program, args=None):
""" Run a command in the background """
proc = await asyncio.create_subprocess_exec(
program,
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
logging.info(
"Executed command \"%s %s\" with status code %d",
program,
" ".join(args),
proc.returncode
)
if stdout:
stdout = stdout.decode()
logging.debug(stdout)
if stderr:
stderr = stderr.decode()
logging.warning(stderr)
return {
"returncode": proc.returncode,
"stdout": stdout,
"stderr": stderr,
}
+201
View File
@@ -0,0 +1,201 @@
"""
A minimal wrapper around 'The Unarchiver' command line tools (v1.10.1)
https://theunarchiver.com/command-line
Later versions (untested) available at: https://github.com/MacPaw/XADMaster
"""
import logging
import pathlib
from tempfile import TemporaryDirectory
from re import escape, match
from json import loads, JSONDecodeError
from util.run import run
FORK_OUTPUT_TYPE_VISIBLE = "visible"
FORK_OUTPUT_TYPE_HIDDEN = "hidden"
FORK_OUTPUT_TYPES = [FORK_OUTPUT_TYPE_VISIBLE, FORK_OUTPUT_TYPE_HIDDEN]
def extract_archive(file_path, **kwargs):
"""
Extracts files from an archive
Takes (str) file_path, and kwargs:
- (list) members - list of (str) files to be extracted (all files are extracted if None)
- (str) output_dir - directory to place the extracted files
- (str) fork_output_type - output type for resource forks; "visible" for *.rsrc files, "hidden" for ._* files
Returns (dict) of extracted and skipped members
"""
members = kwargs.get("members")
if kwargs.get("output_dir"):
if not pathlib.Path(kwargs["output_dir"]).is_dir():
raise ValueError("Argument output_dir must be a directory")
output_dir = str(pathlib.Path(kwargs["output_dir"]).resolve())
else:
output_dir = str(pathlib.Path(file_path).parent.resolve())
if kwargs.get("fork_output_type"):
if kwargs["fork_output_type"] not in FORK_OUTPUT_TYPES:
raise ValueError(f"Argument fork_output_type must be one of: {','.join(FORK_OUTPUT_TYPES)} ")
fork_output_type = kwargs["fork_output_type"]
fork_output_type_args = ["-forks", fork_output_type or FORK_OUTPUT_TYPE_VISIBLE]
else:
fork_output_type = None
fork_output_type_args = []
with TemporaryDirectory() as tmp_dir:
unar_args = [
"-output-directory",
tmp_dir,
"-force-skip",
"-no-directory",
*fork_output_type_args,
'--',
file_path,
]
if members:
for member in members:
unar_args.append(escape(member))
process = run("unar", unar_args)
if process["returncode"] != 0:
raise UnarCommandError(f"Non-zero return code: {process['returncode']}")
unar_result_success = r'^Successfully extracted to "(?P<destination>.+)".$'
unar_result_no_files = "No files extracted."
unar_file_extracted = \
r"^ (?P<path>.+). \(((?P<size>[0-9]+) B)?(?P<types>(dir)?(, )?(rsrc)?)\)\.\.\. (?P<status>[A-Z]+)\.$"
lines = process["stdout"].rstrip("\n").split("\n")
if lines[-1] == unar_result_no_files:
raise UnarNoFilesExtractedError
if match(unar_result_success, lines[-1]):
extracted_members = []
for line in lines[1:-1]:
if line_matches := match(unar_file_extracted, line):
matches = line_matches.groupdict()
member = {
"name": str(pathlib.PurePath(matches["path"]).name),
"path": matches["path"],
"size": matches["size"] or 0,
"is_dir": False,
"is_resource_fork": False,
"absolute_path": str(pathlib.PurePath(tmp_dir).joinpath(matches["path"])),
}
member_types = matches.get("types", "").removeprefix(", ").split(", ")
if "dir" in member_types:
member["is_dir"] = True
if "rsrc" in member_types:
if not fork_output_type:
continue
member["is_resource_fork"] = True
# Update names/paths to match unar resource fork naming convention
if fork_output_type == FORK_OUTPUT_TYPE_HIDDEN:
member["name"] = f"._{member['name']}"
else:
member["name"] += ".rsrc"
member["path"] = str(pathlib.PurePath(member["path"]).parent.joinpath(member["name"]))
member["absolute_path"] = str(pathlib.PurePath(tmp_dir).joinpath(member["path"]))
logging.debug("Extracted: %s -> %s", member['path'], member['absolute_path'])
extracted_members.append(member)
else:
raise UnarUnexpectedOutputError(f"Unexpected output: {line}")
moved = []
skipped = []
for member in sorted(extracted_members, key=lambda m: m["path"]):
source_path = pathlib.Path(member["absolute_path"])
target_path = pathlib.Path(output_dir).joinpath(member["path"])
member["absolute_path"] = str(target_path)
if target_path.exists():
logging.info("Skipping temp file/dir as the target already exists: %s", target_path)
skipped.append(member)
continue
if member["is_dir"]:
logging.debug("Creating empty dir: %s -> %s", source_path, target_path)
target_path.mkdir(parents=True, exist_ok=True)
moved.append(member)
continue
# The parent dir may not be specified as a member, so ensure it exists
target_path.parent.mkdir(parents=True, exist_ok=True)
logging.debug("Moving temp file: %s -> %s", source_path, target_path)
source_path.rename(target_path)
moved.append(member)
return {
"extracted": moved,
"skipped": skipped,
}
raise UnarUnexpectedOutputError(lines[-1])
def inspect_archive(file_path, **kwargs):
"""
Calls `lsar` to inspect the contents of an archive
Takes (str) file_path
Returns (dict) of (str) format, (list) members
"""
if not pathlib.Path(file_path):
raise FileNotFoundError(f"File {file_path} does not exist")
process = run("lsar", ["-json", "--", file_path])
if process["returncode"] != 0:
raise LsarCommandError(f"Non-zero return code: {process['returncode']}")
try:
archive_info = loads(process["stdout"])
except JSONDecodeError as error:
raise LsarOutputError(f"Unable to read JSON output from lsar: {error.msg}") from error
members = [{
"name": pathlib.PurePath(member.get("XADFileName")).name,
"path": member.get("XADFileName"),
"size": member.get("XADFileSize"),
"is_dir": member.get("XADIsDirectory"),
"is_resource_fork": member.get("XADIsResourceFork"),
"raw": member,
} for member in archive_info.get("lsarContents", [])]
return {
"format": archive_info.get("lsarFormatName"),
"members": members,
}
class UnarCommandError(Exception):
""" Command execution was unsuccessful """
pass
class UnarNoFilesExtractedError(Exception):
""" Command completed, but no files extracted """
class UnarUnexpectedOutputError(Exception):
""" Command output not recognized """
class LsarCommandError(Exception):
""" Command execution was unsuccessful """
class LsarOutputError(Exception):
""" Command output could not be parsed"""