RASCSI/python/common/src/rascsi/file_cmds.py

690 lines
26 KiB
Python
Raw Normal View History

"""
Module for methods reading from and writing to the file system
"""
import os
import logging
import asyncio
from functools import lru_cache
from pathlib import PurePath
from zipfile import ZipFile, is_zipfile
from time import time
from subprocess import run, CalledProcessError
from json import dump, load
from shutil import copyfile
import requests
import rascsi_interface_pb2 as proto
from rascsi.common_settings import CFG_DIR, CONFIG_FILE_SUFFIX, PROPERTIES_SUFFIX, ARCHIVE_FILE_SUFFIXES, RESERVATIONS
from rascsi.ractl_cmds import RaCtlCmds
from rascsi.return_codes import ReturnCodes
from rascsi.socket_cmds import SocketCmds
from util import unarchiver
class FileCmds:
"""
class for methods reading from and writing to the file system
"""
def __init__(self, sock_cmd: SocketCmds, ractl: RaCtlCmds, token=None, locale=None):
self.sock_cmd = sock_cmd
self.ractl = ractl
self.token = token
self.locale = locale
# noinspection PyMethodMayBeStatic
# pylint: disable=no-self-use
def list_files(self, file_types, dir_path):
"""
Takes a (list) or (tuple) of (str) file_types - e.g. ('hda', 'hds')
Returns (list) of (list)s files_list:
index 0 is (str) file name and index 1 is (int) size in bytes
"""
files_list = []
for path, _dirs, files in os.walk(dir_path):
# Only list selected file types
files = [f for f in files if f.lower().endswith(file_types)]
files_list.extend(
[
(
file,
os.path.getsize(os.path.join(path, file))
)
for file in files
]
)
return files_list
# noinspection PyMethodMayBeStatic
def list_config_files(self):
"""
Finds fils with file ending CONFIG_FILE_SUFFIX in CFG_DIR.
Returns a (list) of (str) files_list
"""
files_list = []
for _root, _dirs, files in os.walk(CFG_DIR):
for file in files:
if file.endswith("." + CONFIG_FILE_SUFFIX):
files_list.append(file)
return files_list
def list_images(self):
"""
Sends a IMAGE_FILES_INFO command to the server
Returns a (dict) with (bool) status, (str) msg, and (list) of (dict)s files
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.DEFAULT_IMAGE_FILES_INFO
command.params["token"] = self.token
command.params["locale"] = self.locale
data = self.sock_cmd.send_pb_command(command.SerializeToString())
result = proto.PbResult()
result.ParseFromString(data)
# Get a list of all *.properties files in CFG_DIR
prop_data = self.list_files(PROPERTIES_SUFFIX, CFG_DIR)
prop_files = [PurePath(x[0]).stem for x in prop_data]
server_info = self.ractl.get_server_info()
files = []
for file in result.image_files_info.image_files:
# Add properties meta data for the image, if applicable
if file.name in prop_files:
process = self.read_drive_properties(f"{CFG_DIR}/{file.name}.{PROPERTIES_SUFFIX}")
prop = process["conf"]
else:
prop = False
archive_contents = []
if PurePath(file.name).suffix.lower()[1:] in ARCHIVE_FILE_SUFFIXES:
try:
archive_info = self._get_archive_info(
f"{server_info['image_dir']}/{file.name}",
_cache_extra_key=file.size
)
properties_files = [x["path"]
for x in archive_info["members"]
if x["path"].endswith(PROPERTIES_SUFFIX)]
for member in archive_info["members"]:
if member["is_dir"] or member["is_resource_fork"]:
continue
if PurePath(member["path"]).suffix.lower()[1:] == PROPERTIES_SUFFIX:
member["is_properties_file"] = True
elif f"{member['path']}.{PROPERTIES_SUFFIX}" in properties_files:
member["related_properties_file"] = f"{member['path']}.{PROPERTIES_SUFFIX}"
archive_contents.append(member)
except (unarchiver.LsarCommandError, unarchiver.LsarOutputError):
pass
size_mb = "{:,.1f}".format(file.size / 1024 / 1024)
dtype = proto.PbDeviceType.Name(file.type)
files.append({
"name": file.name,
"size": file.size,
"size_mb": size_mb,
"detected_type": dtype,
"prop": prop,
"archive_contents": archive_contents,
})
return {"status": result.status, "msg": result.msg, "files": files}
def create_new_image(self, file_name, file_type, size):
"""
Takes (str) file_name, (str) file_type, and (int) size
Sends a CREATE_IMAGE command to the server
Returns (dict) with (bool) status and (str) msg
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.CREATE_IMAGE
command.params["token"] = self.token
command.params["locale"] = self.locale
command.params["file"] = file_name + "." + file_type
command.params["size"] = str(size)
command.params["read_only"] = "false"
data = self.sock_cmd.send_pb_command(command.SerializeToString())
result = proto.PbResult()
result.ParseFromString(data)
return {"status": result.status, "msg": result.msg}
def delete_image(self, file_name):
"""
Takes (str) file_name
Sends a DELETE_IMAGE command to the server
Returns (dict) with (bool) status and (str) msg
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.DELETE_IMAGE
command.params["token"] = self.ractl.token
command.params["locale"] = self.ractl.locale
command.params["file"] = file_name
data = self.sock_cmd.send_pb_command(command.SerializeToString())
result = proto.PbResult()
result.ParseFromString(data)
return {"status": result.status, "msg": result.msg}
def rename_image(self, file_name, new_file_name):
"""
Takes (str) file_name, (str) new_file_name
Sends a RENAME_IMAGE command to the server
Returns (dict) with (bool) status and (str) msg
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.RENAME_IMAGE
command.params["token"] = self.ractl.token
command.params["locale"] = self.ractl.locale
command.params["from"] = file_name
command.params["to"] = new_file_name
data = self.sock_cmd.send_pb_command(command.SerializeToString())
result = proto.PbResult()
result.ParseFromString(data)
return {"status": result.status, "msg": result.msg}
def copy_image(self, file_name, new_file_name):
"""
Takes (str) file_name, (str) new_file_name
Sends a COPY_IMAGE command to the server
Returns (dict) with (bool) status and (str) msg
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.COPY_IMAGE
command.params["token"] = self.ractl.token
command.params["locale"] = self.ractl.locale
command.params["from"] = file_name
command.params["to"] = new_file_name
data = self.sock_cmd.send_pb_command(command.SerializeToString())
result = proto.PbResult()
result.ParseFromString(data)
return {"status": result.status, "msg": result.msg}
# noinspection PyMethodMayBeStatic
def delete_file(self, file_path):
"""
Takes (str) file_path with the full path to the file to delete
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"file_path": file_path
}
if os.path.exists(file_path):
os.remove(file_path)
return {
"status": True,
"return_code": ReturnCodes.DELETEFILE_SUCCESS,
"parameters": parameters,
}
return {
"status": False,
"return_code": ReturnCodes.DELETEFILE_FILE_NOT_FOUND,
"parameters": parameters,
}
# noinspection PyMethodMayBeStatic
def rename_file(self, file_path, target_path):
"""
Takes (str) file_path and (str) target_path
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"target_path": target_path
}
if os.path.exists(PurePath(target_path).parent):
os.rename(file_path, target_path)
return {
"status": True,
"return_code": ReturnCodes.RENAMEFILE_SUCCESS,
"parameters": parameters,
}
return {
"status": False,
"return_code": ReturnCodes.RENAMEFILE_UNABLE_TO_MOVE,
"parameters": parameters,
}
# noinspection PyMethodMayBeStatic
def copy_file(self, file_path, target_path):
"""
Takes (str) file_path and (str) target_path
Returns (dict) with (bool) status and (str) msg
"""
parameters = {
"target_path": target_path
}
if os.path.exists(PurePath(target_path).parent):
copyfile(file_path, target_path)
return {
"status": True,
"return_code": ReturnCodes.WRITEFILE_SUCCESS,
"parameters": parameters,
}
return {
"status": False,
"return_code": ReturnCodes.WRITEFILE_UNABLE_TO_WRITE,
"parameters": parameters,
}
def extract_image(self, file_path, members=None, move_properties_files_to_config=True):
"""
Takes (str) file_path, (list) members, optional (bool) move_properties_files_to_config
file_name is the path of the archive file to extract, relative to the images directory
members is a list of file paths in the archive file to extract
move_properties_files_to_config controls if .properties files are auto-moved to CFG_DIR
Returns (dict) result
"""
server_info = self.ractl.get_server_info()
if not members:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_NO_FILES_SPECIFIED,
}
try:
extract_result = unarchiver.extract_archive(
f"{server_info['image_dir']}/{file_path}",
members=members,
output_dir=server_info["image_dir"],
)
properties_files_moved = []
if move_properties_files_to_config:
for file in extract_result["extracted"]:
if file.get("name").endswith(".properties"):
if (self.rename_file(
file["absolute_path"],
f"{CFG_DIR}/{file['name']}"
)):
properties_files_moved.append({
"status": True,
"name": file["path"],
"path": f"{CFG_DIR}/{file['name']}",
})
else:
properties_files_moved.append({
"status": False,
"name": file["path"],
"path": f"{CFG_DIR}/{file['name']}",
})
return {
"status": True,
"return_code": ReturnCodes.EXTRACTIMAGE_SUCCESS,
"parameters": {
"count": len(extract_result["extracted"]),
},
"extracted": extract_result["extracted"],
"skipped": extract_result["skipped"],
"properties_files_moved": properties_files_moved,
}
except unarchiver.UnarNoFilesExtractedError:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_NO_FILES_EXTRACTED,
}
except (unarchiver.UnarCommandError, unarchiver.UnarUnexpectedOutputError) as error:
return {
"status": False,
"return_code": ReturnCodes.EXTRACTIMAGE_COMMAND_ERROR,
"parameters": {
"error": error,
}
}
def download_file_to_iso(self, url, *iso_args):
"""
Takes (str) url and one or more (str) *iso_args
Returns (dict) with (bool) status and (str) msg
"""
server_info = self.ractl.get_server_info()
file_name = PurePath(url).name
tmp_ts = int(time())
tmp_dir = "/tmp/" + str(tmp_ts) + "/"
os.mkdir(tmp_dir)
tmp_full_path = tmp_dir + file_name
iso_filename = f"{server_info['image_dir']}/{file_name}.iso"
req_proc = self.download_to_dir(url, tmp_dir, file_name)
if not req_proc["status"]:
return {"status": False, "msg": req_proc["msg"]}
if is_zipfile(tmp_full_path):
if "XtraStuf.mac" in str(ZipFile(tmp_full_path).namelist()):
logging.info("MacZip file format detected. Will not unzip to retain resource fork.")
else:
logging.info(
"%s is a zipfile! Will attempt to unzip and store the resulting files.",
tmp_full_path,
)
2022-07-30 20:01:22 +00:00
unzip_proc = asyncio.run(self.run_async("unzip", [
"-d",
tmp_dir,
"-n",
tmp_full_path,
]))
if not unzip_proc["returncode"]:
logging.info(
"%s was successfully unzipped. Deleting the zipfile.",
tmp_full_path,
)
self.delete_file(tmp_full_path)
try:
run(
[
"genisoimage",
*iso_args,
"-o",
iso_filename,
tmp_dir,
],
capture_output=True,
check=True,
)
except CalledProcessError as error:
logging.warning("Executed shell command: %s", " ".join(error.cmd))
logging.warning("Got error: %s", error.stderr.decode("utf-8"))
return {"status": False, "msg": error.stderr.decode("utf-8")}
parameters = {
"value": " ".join(iso_args)
}
return {
"status": True,
"return_code": ReturnCodes.DOWNLOADFILETOISO_SUCCESS,
"parameters": parameters,
"file_name": iso_filename,
}
# noinspection PyMethodMayBeStatic
def download_to_dir(self, url, save_dir, file_name):
"""
Takes (str) url, (str) save_dir, (str) file_name
Returns (dict) with (bool) status and (str) msg
"""
logging.info("Making a request to download %s", url)
try:
with requests.get(url, stream=True, headers={"User-Agent": "Mozilla/5.0"}) as req:
req.raise_for_status()
with open(f"{save_dir}/{file_name}", "wb") as download:
for chunk in req.iter_content(chunk_size=8192):
download.write(chunk)
except requests.exceptions.RequestException as error:
logging.warning("Request failed: %s", str(error))
return {"status": False, "msg": str(error)}
logging.info("Response encoding: %s", req.encoding)
logging.info("Response content-type: %s", req.headers["content-type"])
logging.info("Response status code: %s", req.status_code)
parameters = {
"file_name": file_name,
"save_dir": save_dir
}
return {
"status": True,
"return_code": ReturnCodes.DOWNLOADTODIR_SUCCESS,
"parameters": parameters,
}
def write_config(self, file_name):
"""
Takes (str) file_name
Returns (dict) with (bool) status and (str) msg
"""
file_name = f"{CFG_DIR}/{file_name}"
try:
with open(file_name, "w", encoding="ISO-8859-1") as json_file:
version = self.ractl.get_server_info()["version"]
devices = self.ractl.list_devices()["device_list"]
for device in devices:
# Remove keys that we don't want to store in the file
del device["status"]
del device["file"]
# It's cleaner not to store an empty parameter for every device without media
if device["image"] == "":
device["image"] = None
# RaSCSI product names will be generated on the fly by RaSCSI
if device["vendor"] == "RaSCSI":
device["vendor"] = device["product"] = device["revision"] = None
# A block size of 0 is how RaSCSI indicates N/A for block size
if device["block_size"] == 0:
device["block_size"] = None
# Convert to a data type that can be serialized
device["params"] = dict(device["params"])
reserved_ids_and_memos = []
reserved_ids = self.ractl.get_reserved_ids()["ids"]
for scsi_id in reserved_ids:
reserved_ids_and_memos.append({"id": scsi_id,
"memo": RESERVATIONS[int(scsi_id)]})
dump(
{"version": version,
"devices": devices,
"reserved_ids": reserved_ids_and_memos},
json_file,
indent=4
)
parameters = {
"target_path": file_name
}
return {
"status": True,
"return_code": ReturnCodes.WRITEFILE_SUCCESS,
"parameters": parameters,
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
self.delete_file(file_name)
return {"status": False, "msg": str(error)}
except:
logging.error("Could not write to file: %s", file_name)
self.delete_file(file_name)
parameters = {
"file_name": file_name
}
return {
"status": False,
"return_code": ReturnCodes.WRITEFILE_COULD_NOT_WRITE,
"parameters": parameters,
}
def read_config(self, file_name):
"""
Takes (str) file_name
Returns (dict) with (bool) status and (str) msg
"""
file_name = f"{CFG_DIR}/{file_name}"
try:
with open(file_name, encoding="ISO-8859-1") as json_file:
config = load(json_file)
# If the config file format changes again in the future,
# introduce more sophisticated format detection logic here.
if isinstance(config, dict):
self.ractl.detach_all()
ids_to_reserve = []
for item in config["reserved_ids"]:
ids_to_reserve.append(item["id"])
RESERVATIONS[int(item["id"])] = item["memo"]
self.ractl.reserve_scsi_ids(ids_to_reserve)
for row in config["devices"]:
kwargs = {
"device_type": row["device_type"],
"unit": int(row["unit"]),
"vendor": row["vendor"],
"product": row["product"],
"revision": row["revision"],
"block_size": row["block_size"],
"params": dict(row["params"]),
}
if row["image"]:
kwargs["params"]["file"] = row["image"]
self.ractl.attach_device(row["id"], **kwargs)
# The config file format in RaSCSI 21.10 is using a list data type at the top level.
# If future config file formats return to the list data type,
# introduce more sophisticated format detection logic here.
elif isinstance(config, list):
self.ractl.detach_all()
for row in config:
kwargs = {
"device_type": row["device_type"],
"image": row["image"],
"unit": int(row["un"]),
"vendor": row["vendor"],
"product": row["product"],
"revision": row["revision"],
"block_size": row["block_size"],
"params": dict(row["params"]),
}
if row["image"]:
kwargs["params"]["file"] = row["image"]
self.ractl.attach_device(row["id"], **kwargs)
logging.warning("%s is in an obsolete config file format", file_name)
else:
return {"status": False,
"return_code": ReturnCodes.READCONFIG_INVALID_CONFIG_FILE_FORMAT}
parameters = {
"file_name": file_name
}
return {
"status": True,
"return_code": ReturnCodes.READCONFIG_SUCCESS,
"parameters": parameters
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
return {"status": False, "msg": str(error)}
except:
logging.error("Could not read file: %s", file_name)
parameters = {
"file_name": file_name
}
return {
"status": False,
"return_code": ReturnCodes.READCONFIG_COULD_NOT_READ,
"parameters": parameters
}
def write_drive_properties(self, file_name, conf):
"""
Writes a drive property configuration file to the config dir.
Takes file name base (str) and (list of dicts) conf as arguments
Returns (dict) with (bool) status and (str) msg
"""
file_path = f"{CFG_DIR}/{file_name}"
try:
with open(file_path, "w") as json_file:
dump(conf, json_file, indent=4)
parameters = {
"target_path": file_path
}
return {
"status": True,
"return_code": ReturnCodes.WRITEFILE_SUCCESS,
"parameters": parameters,
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
self.delete_file(file_path)
return {"status": False, "msg": str(error)}
except:
logging.error("Could not write to file: %s", file_path)
self.delete_file(file_path)
parameters = {
"target_path": file_path
}
return {
"status": False,
"return_code": ReturnCodes.WRITEFILE_COULD_NOT_WRITE,
"parameters": parameters,
}
# noinspection PyMethodMayBeStatic
def read_drive_properties(self, file_path):
"""
Reads drive properties from json formatted file.
Takes (str) file_path as argument.
Returns (dict) with (bool) status, (str) msg, (dict) conf
"""
try:
with open(file_path) as json_file:
conf = load(json_file)
parameters = {
"file_path": file_path
}
return {
"status": True,
"return_codes": ReturnCodes.READDRIVEPROPS_SUCCESS,
"parameters": parameters,
"conf": conf,
}
except (IOError, ValueError, EOFError, TypeError) as error:
logging.error(str(error))
return {"status": False, "msg": str(error)}
except:
logging.error("Could not read file: %s", file_path)
parameters = {
"file_path": file_path
}
return {
"status": False,
"return_codes": ReturnCodes.READDRIVEPROPS_COULD_NOT_READ,
"parameters": parameters,
}
# noinspection PyMethodMayBeStatic
2022-07-30 20:01:22 +00:00
async def run_async(self, program, args):
"""
Takes (str) cmd with the shell command to execute
Executes shell command and captures output
Returns (dict) with (int) returncode, (str) stdout, (str) stderr
"""
2022-07-30 20:01:22 +00:00
proc = await asyncio.create_subprocess_exec(
program,
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
2022-07-30 20:01:22 +00:00
logging.info("Executed command \"%s %s\" with status code %d", program, " ".join(args), proc.returncode)
if stdout:
stdout = stdout.decode()
logging.info("stdout: %s", stdout)
if stderr:
stderr = stderr.decode()
logging.info("stderr: %s", stderr)
return {"returncode": proc.returncode, "stdout": stdout, "stderr": stderr}
# noinspection PyMethodMayBeStatic
@lru_cache(maxsize=32)
def _get_archive_info(self, file_path, **kwargs):
"""
Cached wrapper method to improve performance, e.g. on index screen
"""
try:
return unarchiver.inspect_archive(file_path)
except (unarchiver.LsarCommandError, unarchiver.LsarOutputError):
raise