""" Module for methods reading from and writing to the file system """ import os import logging from pathlib import PurePath from ractl_cmds import ( get_server_info, get_reserved_ids, attach_image, detach_all, list_devices, reserve_scsi_ids, ) from socket_cmds import send_pb_command from settings import CFG_DIR, CONFIG_FILE_SUFFIX, PROPERTIES_SUFFIX, RESERVATIONS import rascsi_interface_pb2 as proto def list_files(file_types, dir_path): """ Takes a (list) or (tuple) of (str) file_types - e.g. ('hda', 'hds') Returns (list) of (list)s files_list: index 0 is (str) file name and index 1 is (int) size in bytes """ files_list = [] for path, dirs, files in os.walk(dir_path): # Only list selected file types files = [f for f in files if f.lower().endswith(file_types)] files_list.extend( [ ( file, os.path.getsize(os.path.join(path, file)) ) for file in files ] ) return files_list def list_config_files(): """ Finds fils with file ending CONFIG_FILE_SUFFIX in CFG_DIR. Returns a (list) of (str) files_list """ files_list = [] for root, dirs, files in os.walk(CFG_DIR): for file in files: if file.endswith("." + CONFIG_FILE_SUFFIX): files_list.append(file) return files_list def list_images(): """ Sends a IMAGE_FILES_INFO command to the server Returns a (dict) with (bool) status, (str) msg, and (list) of (dict)s files """ command = proto.PbCommand() command.operation = proto.PbOperation.DEFAULT_IMAGE_FILES_INFO data = send_pb_command(command.SerializeToString()) result = proto.PbResult() result.ParseFromString(data) # Get a list of all *.properties files in CFG_DIR prop_data = list_files(PROPERTIES_SUFFIX, CFG_DIR) prop_files = [PurePath(x[0]).stem for x in prop_data] from zipfile import ZipFile, is_zipfile server_info = get_server_info() files = [] for file in result.image_files_info.image_files: # Add properties meta data for the image, if applicable if file.name in prop_files: process = read_drive_properties(f"{CFG_DIR}/{file.name}.{PROPERTIES_SUFFIX}") prop = process["conf"] else: prop = False if file.name.lower().endswith(".zip"): zip_path = f"{server_info['image_dir']}/{file.name}" if is_zipfile(zip_path): zipfile = ZipFile(zip_path) # Get a list of (str) containing all zipfile members zip_members = zipfile.namelist() # Strip out directories from the list zip_members = [x for x in zip_members if not x.endswith("/")] else: logging.warning("%s is an invalid zip file", zip_path) zip_members = False else: zip_members = False size_mb = "{:,.1f}".format(file.size / 1024 / 1024) dtype = proto.PbDeviceType.Name(file.type) files.append({ "name": file.name, "size": file.size, "size_mb": size_mb, "detected_type": dtype, "prop": prop, "zip_members": zip_members, }) return {"status": result.status, "msg": result.msg, "files": files} def create_new_image(file_name, file_type, size): """ Takes (str) file_name, (str) file_type, and (int) size Sends a CREATE_IMAGE command to the server Returns (dict) with (bool) status and (str) msg """ command = proto.PbCommand() command.operation = proto.PbOperation.CREATE_IMAGE command.params["file"] = file_name + "." + file_type command.params["size"] = str(size) command.params["read_only"] = "false" data = send_pb_command(command.SerializeToString()) result = proto.PbResult() result.ParseFromString(data) return {"status": result.status, "msg": result.msg} def delete_image(file_name): """ Takes (str) file_name Sends a DELETE_IMAGE command to the server Returns (dict) with (bool) status and (str) msg """ command = proto.PbCommand() command.operation = proto.PbOperation.DELETE_IMAGE command.params["file"] = file_name data = send_pb_command(command.SerializeToString()) result = proto.PbResult() result.ParseFromString(data) return {"status": result.status, "msg": result.msg} def delete_file(file_path): """ Takes (str) file_path with the full path to the file to delete Returns (dict) with (bool) status and (str) msg """ if os.path.exists(file_path): os.remove(file_path) return {"status": True, "msg": f"File deleted: {file_path}"} return {"status": False, "msg": f"File to delete not found: {file_path}"} def rename_file(file_path, target_path): """ Takes (str) file_path and (str) target_path Returns (dict) with (bool) status and (str) msg """ if os.path.exists(PurePath(target_path).parent): os.rename(file_path, target_path) return {"status": True, "msg": f"File moved to: {target_path}"} return {"status": False, "msg": f"Unable to move to: {target_path}"} def unzip_file(file_name, member=False, members=False): """ Takes (str) file_name, optional (str) member, optional (list) of (str) members file_name is the name of the zip file to unzip member is the full path to the particular file in the zip file to unzip members contains all of the full paths to each of the zip archive members Returns (dict) with (boolean) status and (list of str) msg """ from subprocess import run from re import escape server_info = get_server_info() prop_flag = False if not member: unzip_proc = run( ["unzip", "-d", server_info["image_dir"], "-n", "-j", \ f"{server_info['image_dir']}/{file_name}"], capture_output=True ) for path in members: if path.endswith(PROPERTIES_SUFFIX): name = PurePath(path).name rename_file(f"{server_info['image_dir']}/{name}", f"{CFG_DIR}/{name}") prop_flag = True else: unzip_proc = run( ["unzip", "-d", server_info["image_dir"], "-n", "-j", \ f"{server_info['image_dir']}/{file_name}", escape(member)], capture_output=True ) # Attempt to unzip a properties file in the same archive dir unzip_prop = run( ["unzip", "-d", CFG_DIR, "-n", "-j", \ f"{server_info['image_dir']}/{file_name}", escape(member) + "." + PROPERTIES_SUFFIX], capture_output=False ) if unzip_prop.returncode == 0: prop_flag = True if unzip_proc.returncode != 0: stderr = unzip_proc.stderr.decode("utf-8") logging.warning("Unzipping failed: %s", stderr) return {"status": False, "msg": stderr} from re import findall unzipped = findall( "(?:inflating|extracting):(.+)\n", unzip_proc.stdout.decode("utf-8") ) return {"status": True, "msg": unzipped, "prop_flag": prop_flag} def download_file_to_iso(url): """ Takes (int) scsi_id and (str) url Returns (dict) with (bool) status and (str) msg """ from time import time from subprocess import run server_info = get_server_info() file_name = PurePath(url).name tmp_ts = int(time()) tmp_dir = "/tmp/" + str(tmp_ts) + "/" os.mkdir(tmp_dir) tmp_full_path = tmp_dir + file_name iso_filename = f"{server_info['image_dir']}/{file_name}.iso" req_proc = download_to_dir(url, tmp_dir) if not req_proc["status"]: return {"status": False, "msg": req_proc["msg"]} iso_proc = run( ["genisoimage", "-hfs", "-o", iso_filename, tmp_full_path], capture_output=True ) if iso_proc.returncode != 0: return {"status": False, "msg": iso_proc.stderr.decode("utf-8")} return {"status": True, "msg": iso_proc.stdout.decode("utf-8"), "file_name": iso_filename} def download_to_dir(url, save_dir): """ Takes (str) url, (str) save_dir Returns (dict) with (bool) status and (str) msg """ import requests file_name = PurePath(url).name logging.info("Making a request to download %s", url) try: with requests.get(url, stream=True, headers={"User-Agent": "Mozilla/5.0"}) as req: req.raise_for_status() with open(f"{save_dir}/{file_name}", "wb") as download: for chunk in req.iter_content(chunk_size=8192): download.write(chunk) except requests.exceptions.RequestException as error: logging.warning("Request failed: %s", str(error)) return {"status": False, "msg": str(error)} logging.info("Response encoding: %s", req.encoding) logging.info("Response content-type: %s", req.headers["content-type"]) logging.info("Response status code: %s", req.status_code) return {"status": True, "msg": f"File downloaded from {url} to {save_dir}"} def write_config(file_name): """ Takes (str) file_name Returns (dict) with (bool) status and (str) msg """ from json import dump file_name = f"{CFG_DIR}/{file_name}" try: with open(file_name, "w") as json_file: version = get_server_info()["version"] devices = list_devices()["device_list"] for device in devices: # Remove keys that we don't want to store in the file del device["status"] del device["file"] # It's cleaner not to store an empty parameter for every device without media if device["image"] == "": device["image"] = None # RaSCSI product names will be generated on the fly by RaSCSI if device["vendor"] == "RaSCSI": device["vendor"] = device["product"] = device["revision"] = None # A block size of 0 is how RaSCSI indicates N/A for block size if device["block_size"] == 0: device["block_size"] = None # Convert to a data type that can be serialized device["params"] = dict(device["params"]) reserved_ids_and_memos = [] reserved_ids = get_reserved_ids()["ids"] for scsi_id in reserved_ids: reserved_ids_and_memos.append({"id": scsi_id, "memo": RESERVATIONS[int(scsi_id)]}) dump( {"version": version, "devices": devices, "reserved_ids": reserved_ids_and_memos}, json_file, indent=4 ) return {"status": True, "msg": f"Saved config to {file_name}"} except (IOError, ValueError, EOFError, TypeError) as error: logging.error(str(error)) delete_file(file_name) return {"status": False, "msg": str(error)} except: logging.error("Could not write to file: %s", file_name) delete_file(file_name) return {"status": False, "msg": f"Could not write to file: {file_name}"} def read_config(file_name): """ Takes (str) file_name Returns (dict) with (bool) status and (str) msg """ from json import load file_name = f"{CFG_DIR}/{file_name}" try: with open(file_name) as json_file: config = load(json_file) # If the config file format changes again in the future, # introduce more sophisticated format detection logic here. if isinstance(config, dict): detach_all() ids_to_reserve = [] for item in config["reserved_ids"]: ids_to_reserve.append(item["id"]) RESERVATIONS[int(item["id"])] = item["memo"] reserve_scsi_ids(ids_to_reserve) for row in config["devices"]: kwargs = { "device_type": row["device_type"], "image": row["image"], "unit": int(row["unit"]), "vendor": row["vendor"], "product": row["product"], "revision": row["revision"], "block_size": row["block_size"], } params = dict(row["params"]) for param in params.keys(): kwargs[param] = params[param] attach_image(row["id"], **kwargs) # The config file format in RaSCSI 21.10 is using a list data type at the top level. # If future config file formats return to the list data type, # introduce more sophisticated format detection logic here. elif isinstance(config, list): detach_all() for row in config: kwargs = { "device_type": row["device_type"], "image": row["image"], # "un" for backwards compatibility "unit": int(row["un"]), "vendor": row["vendor"], "product": row["product"], "revision": row["revision"], "block_size": row["block_size"], } params = dict(row["params"]) for param in params.keys(): kwargs[param] = params[param] attach_image(row["id"], **kwargs) else: return {"status": False, "msg": "Invalid config file format."} return {"status": True, "msg": f"Loaded config from: {file_name}"} except (IOError, ValueError, EOFError, TypeError) as error: logging.error(str(error)) return {"status": False, "msg": str(error)} except: logging.error("Could not read file: %s", file_name) return {"status": False, "msg": f"Could not read file: {file_name}"} def write_drive_properties(file_name, conf): """ Writes a drive property configuration file to the config dir. Takes file name base (str) and (list of dicts) conf as arguments Returns (dict) with (bool) status and (str) msg """ from json import dump file_path = f"{CFG_DIR}/{file_name}" try: with open(file_path, "w") as json_file: dump(conf, json_file, indent=4) return {"status": True, "msg": f"Created file: {file_path}"} except (IOError, ValueError, EOFError, TypeError) as error: logging.error(str(error)) delete_file(file_path) return {"status": False, "msg": str(error)} except: logging.error("Could not write to file: %s", file_path) delete_file(file_path) return {"status": False, "msg": f"Could not write to file: {file_path}"} def read_drive_properties(path_name): """ Reads drive properties from json formatted file. Takes (str) path_name as argument. Returns (dict) with (bool) status, (str) msg, (dict) conf """ from json import load try: with open(path_name) as json_file: conf = load(json_file) return {"status": True, "msg": f"Read from file: {path_name}", "conf": conf} except (IOError, ValueError, EOFError, TypeError) as error: logging.error(str(error)) return {"status": False, "msg": str(error)} except: logging.error("Could not read file: %s", path_name) return {"status": False, "msg": f"Could not read file: {path_name}"}