
323 lines
10 KiB

import os
import logging
from ractl_cmds import (
from settings import *
import rascsi_interface_pb2 as proto
def list_files(file_types, dir_path):
Takes a list or tuple of str file_types - e.g. ('hda', 'hds')
Returns list of lists files_list:
index 0 is str file name and index 1 is int size in bytes
files_list = []
for path, dirs, files in os.walk(dir_path):
# Only list selected file types
files = [f for f in files if f.lower().endswith(file_types)]
os.path.getsize(os.path.join(path, file))
for file in files
return files_list
def list_config_files():
Returns a list of RaSCSI config files in cfg_dir:
list of str files_list
files_list = []
for root, dirs, files in os.walk(cfg_dir):
for file in files:
if file.endswith(".json"):
return files_list
def list_images():
Sends a IMAGE_FILES_INFO command to the server
Returns a dict with boolean status, str msg, and list of dicts files
command = proto.PbCommand()
command.operation = proto.PbOperation.DEFAULT_IMAGE_FILES_INFO
data = send_pb_command(command.SerializeToString())
result = proto.PbResult()
# Get a list of all *.properties files in cfg_dir
from pathlib import PurePath
prop_data = list_files(PROPERTIES_SUFFIX, cfg_dir)
prop_files = [PurePath(x[0]).stem for x in prop_data]
files = []
for f in result.image_files_info.image_files:
# Add flag for whether an image file has an associated *.properties file
if f.name in prop_files:
prop = True
prop = False
size_mb = "{:,.1f}".format(f.size / 1024 / 1024)
dtype = proto.PbDeviceType.Name(f.type)
"name": f.name,
"size": f.size,
"size_mb": size_mb,
"detected_type": dtype,
"prop": prop,
return {"status": result.status, "msg": result.msg, "files": files}
def create_new_image(file_name, file_type, size):
Takes str file_name, str file_type, and int size
Sends a CREATE_IMAGE command to the server
Returns dict with boolean status and str msg
command = proto.PbCommand()
command.operation = proto.PbOperation.CREATE_IMAGE
command.params["file"] = file_name + "." + file_type
command.params["size"] = str(size)
command.params["read_only"] = "false"
data = send_pb_command(command.SerializeToString())
result = proto.PbResult()
return {"status": result.status, "msg": result.msg}
def delete_image(file_name):
Takes str file_name
Sends a DELETE_IMAGE command to the server
Returns dict with boolean status and str msg
command = proto.PbCommand()
command.operation = proto.PbOperation.DELETE_IMAGE
command.params["file"] = file_name
data = send_pb_command(command.SerializeToString())
result = proto.PbResult()
return {"status": result.status, "msg": result.msg}
def delete_file(file_path):
Takes str file_path with the full path to the file to delete
Returns dict with boolean status and str msg
if os.path.exists(file_path):
return {"status": True, "msg": "File deleted"}
return {"status": False, "msg": "Could not delete file"}
def unzip_file(file_name):
Takes str file_name
Returns dict with boolean status and str msg
from subprocess import run
server_info = get_server_info()
unzip_proc = run(
["unzip", "-d", server_info["image_dir"], "-o", "-j", \
server_info["image_dir"] + file_name], capture_output=True
if unzip_proc.returncode != 0:
logging.warning(f"Unzipping failed: {unzip_proc}")
return {"status": False, "msg": unzip_proc}
return {"status": True, "msg": f"{file_name} unzipped"}
def download_file_to_iso(scsi_id, url):
Takes int scsi_id and str url
Returns dict with boolean status and str msg
import urllib.request
import urllib.error as error
import time
from subprocess import run
server_info = get_server_info()
file_name = url.split("/")[-1]
tmp_ts = int(time.time())
tmp_dir = "/tmp/" + str(tmp_ts) + "/"
tmp_full_path = tmp_dir + file_name
iso_filename = server_info["image_dir"] + file_name + ".iso"
urllib.request.urlretrieve(url, tmp_full_path)
except (error.URLError, error.HTTPError, error.ContentTooShortError) as e:
return {"status": False, "msg": str(e)}
return {"status": False, "msg": "Error loading the URL"}
# iso_filename = make_cd(tmp_full_path, None, None) # not working yet
iso_proc = run(
["genisoimage", "-hfs", "-o", iso_filename, tmp_full_path], capture_output=True
if iso_proc.returncode != 0:
return {"status": False, "msg": iso_proc}
process = attach_image(scsi_id, type="SCCD", image=iso_filename)
return {"status": process["status"], "msg": process["msg"]}
def download_image(url):
Takes str url
Returns dict with boolean status and str msg
import urllib.request
import urllib.error as error
server_info = get_server_info()
file_name = url.split("/")[-1]
full_path = server_info["image_dir"] + file_name
urllib.request.urlretrieve(url, full_path)
return {"status": True, "msg": "Downloaded the URL"}
except (error.URLError, error.HTTPError, error.ContentTooShortError) as e:
return {"status": False, "msg": str(e)}
return {"status": False, "msg": "Error loading the URL"}
def write_config(file_name):
Takes str file_name
Returns dict with boolean status and str msg
from json import dump
file_name = cfg_dir + file_name
with open(file_name, "w") as json_file:
devices = list_devices()["device_list"]
for device in devices:
# Remove keys that we don't want to store in the file
del device["status"]
del device["file"]
# It's cleaner not to store an empty parameter for every device without media
if device["image"] == "":
device["image"] = None
# RaSCSI product names will be generated on the fly by RaSCSI
if device["vendor"] == "RaSCSI":
device["vendor"] = device["product"] = device["revision"] = None
# A block size of 0 is how RaSCSI indicates N/A for block size
if device["block_size"] == 0:
device["block_size"] = None
# Convert to a data type that can be serialized
device["params"] = dict(device["params"])
dump(devices, json_file, indent=4)
return {"status": True, "msg": f"Successfully wrote to file: {file_name}"}
except (IOError, ValueError, EOFError, TypeError) as e:
return {"status": False, "msg": str(e)}
logging.error(f"Could not write to file: {file_name}")
return {"status": False, "msg": f"Could not write to file: {file_name}"}
def read_config(file_name):
Takes str file_name
Returns dict with boolean status and str msg
from json import load
file_name = cfg_dir + file_name
with open(file_name) as json_file:
devices = load(json_file)
for row in devices:
kwargs = {"device_type": row["device_type"], \
"image": row["image"], "unit": int(row["un"]), \
"vendor": row["vendor"], "product": row["product"], \
"revision": row["revision"], "block_size": row["block_size"]}
params = dict(row["params"])
for p in params.keys():
kwargs[p] = params[p]
process = attach_image(row["id"], **kwargs)
if process["status"] == True:
return {"status": process["status"], "msg": f"Successfully read from file: {file_name}"}
return {"status": process["status"], "msg": process["msg"]}
except (IOError, ValueError, EOFError, TypeError) as e:
return {"status": False, "msg": str(e)}
logging.error(f"Could not read file: {file_name}")
return {"status": False, "msg": f"Could not read file: {file_name}"}
def write_drive_properties(file_name, conf):
Writes a drive property configuration file to the images dir.
Takes file name base (str) and conf (list of dicts) as arguments
Returns dict with boolean status and str msg
from json import dump
with open(cfg_dir + file_name, "w") as json_file:
dump(conf, json_file, indent=4)
return {"status": True, "msg": f"Successfully wrote to file: {file_name}"}
except (IOError, ValueError, EOFError, TypeError) as e:
return {"status": False, "msg": str(e)}
logging.error(f"Could not write to file: {file_name}")
return {"status": False, "msg": f"Could not write to file: {file_name}"}
def read_drive_properties(path_name):
Reads drive properties to any dir.
Either ones deployed to the images dir, or the canonical database.
Takes file path and bas (str) as argument
Returns dict with boolean status and str msg
from json import load
with open(path_name) as json_file:
conf = load(json_file)
return {"status": True, "msg": f"Read data from file: {path_name}", "conf": conf}
except (IOError, ValueError, EOFError, TypeError) as e:
return {"status": False, "msg": str(e)}
logging.error(f"Could not read file: {file_name}")
return {"status": False, "msg": f"Could not read file: {path_name}"}