New class method for creating image file with python; use python file… (#1079)

* New class method for creating image file with python; use python file operations across the board.

Also fixed several bugs:

Added check that a file exists before copying/moving/creating
Check that parent dir exists always passed
Incorrect return message token
This commit is contained in:
Daniel Markstedt 2023-01-26 21:51:59 -08:00 committed by GitHub
parent f15baec58e
commit 139a6ec371
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 91 deletions

View File

@ -65,8 +65,7 @@ class FileCmds:
files_list = []
for file_path, _dirs, files in walk(dir_path):
# Only list selected file types
# TODO: Refactor for readability?
files = [f for f in files if f.lower().endswith(file_types)]
files = [file for file in files if file.lower().endswith(file_types)]
files_list.extend(
[(file, path.getsize(path.join(file_path, file))) for file in files],
)
@ -160,87 +159,11 @@ class FileCmds:
return {"status": result.status, "msg": result.msg, "files": files}
def create_new_image(self, file_name, file_type, size):
"""
Takes (str) file_name, (str) file_type, and (int) size
Sends a CREATE_IMAGE command to the server
Returns (dict) with (bool) status and (str) msg
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.CREATE_IMAGE
command.params["token"] = self.token
command.params["locale"] = self.locale
command.params["file"] = f"{file_name}.{file_type}"
command.params["size"] = str(size)
command.params["read_only"] = "false"
data = self.send_pb_command(command)
result = proto.PbResult()
result.ParseFromString(data)
return {"status": result.status, "msg": result.msg}
def delete_image(self, file_name):
"""
Takes (str) file_name
Sends a DELETE_IMAGE command to the server
Returns (dict) with (bool) status and (str) msg
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.DELETE_IMAGE
command.params["token"] = self.piscsi.token
command.params["locale"] = self.piscsi.locale
command.params["file"] = file_name
data = self.send_pb_command(command)
result = proto.PbResult()
result.ParseFromString(data)
return {"status": result.status, "msg": result.msg}
def rename_image(self, file_name, new_file_name):
"""
Takes (str) file_name, (str) new_file_name
Sends a RENAME_IMAGE command to the server
Returns (dict) with (bool) status and (str) msg
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.RENAME_IMAGE
command.params["token"] = self.piscsi.token
command.params["locale"] = self.piscsi.locale
command.params["from"] = file_name
command.params["to"] = new_file_name
data = self.send_pb_command(command)
result = proto.PbResult()
result.ParseFromString(data)
return {"status": result.status, "msg": result.msg}
def copy_image(self, file_name, new_file_name):
"""
Takes (str) file_name, (str) new_file_name
Sends a COPY_IMAGE command to the server
Returns (dict) with (bool) status and (str) msg
"""
command = proto.PbCommand()
command.operation = proto.PbOperation.COPY_IMAGE
command.params["token"] = self.piscsi.token
command.params["locale"] = self.piscsi.locale
command.params["from"] = file_name
command.params["to"] = new_file_name
data = self.send_pb_command(command)
result = proto.PbResult()
result.ParseFromString(data)
return {"status": result.status, "msg": result.msg}
# noinspection PyMethodMayBeStatic
def delete_file(self, file_path):
"""
Takes (Path) file_path for the file to delete
Returns (dict) with (bool) status and (str) msg
Returns (dict) with (bool) status, (str) msg, (dict) parameters
"""
parameters = {"file_path": file_path}
@ -263,10 +186,10 @@ class FileCmds:
Takes:
- (Path) file_path for the file to rename
- (Path) target_path for the name to rename
Returns (dict) with (bool) status and (str) msg
Returns (dict) with (bool) status, (str) msg, (dict) parameters
"""
parameters = {"target_path": target_path}
if target_path.parent.exists:
if target_path.parent.exists() and not target_path.exists():
file_path.rename(target_path)
return {
"status": True,
@ -285,10 +208,10 @@ class FileCmds:
Takes:
- (Path) file_path for the file to copy from
- (Path) target_path for the name to copy to
Returns (dict) with (bool) status and (str) msg
Returns (dict) with (bool) status, (str) msg, (dict) parameters
"""
parameters = {"target_path": target_path}
if target_path.parent.exists:
if target_path.parent.exists() and not target_path.exists():
copyfile(str(file_path), str(target_path))
return {
"status": True,
@ -297,7 +220,30 @@ class FileCmds:
}
return {
"status": False,
"return_code": ReturnCodes.WRITEFILE_UNABLE_TO_WRITE,
"return_code": ReturnCodes.WRITEFILE_COULD_NOT_WRITE,
"parameters": parameters,
}
def create_empty_image(self, file_path, size):
"""
Takes (Path) file_path and (int) size in bytes
Creates a new empty binary file to use as image
Returns (dict) with (bool) status, (str) msg, (dict) parameters
"""
parameters = {"target_path": file_path}
if file_path.parent.exists() and not file_path.exists():
try:
with open(f"{file_path}", "wb") as out:
out.seek(size - 1)
out.write(b"\0")
except OSError as error:
return {"status": False, "msg": str(error)}
return {"status": True, "msg": ""}
return {
"status": False,
"return_code": ReturnCodes.WRITEFILE_COULD_NOT_WRITE,
"parameters": parameters,
}

View File

@ -399,11 +399,12 @@ def drive_create():
)
# Creating the image file
process = file_cmd.create_new_image(
file_name,
properties["file_type"],
server_info = piscsi_cmd.get_server_info()
process = file_cmd.create_empty_image(
Path(server_info["image_dir"]) / f"{file_name}.{properties['file_type']}",
properties["size"],
)
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
return response(error=True, message=process["msg"])
@ -1007,7 +1008,10 @@ def create_file():
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
full_file_name = f"{file_name}.{file_type}"
process = file_cmd.create_new_image(str(file_name), file_type, size)
server_info = piscsi_cmd.get_server_info()
process = file_cmd.create_empty_image(Path(server_info["image_dir"]) / full_file_name, size)
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
return response(error=True, message=process["msg"])
@ -1141,7 +1145,9 @@ def delete():
safe_path = is_safe_path(file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
process = file_cmd.delete_image(str(file_name))
server_info = piscsi_cmd.get_server_info()
process = file_cmd.delete_file(Path(server_info["image_dir"]) / file_name)
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
return response(error=True, message=process["msg"])
@ -1182,7 +1188,12 @@ def rename():
safe_path = is_safe_path(new_file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
process = file_cmd.rename_image(str(file_name), str(new_file_name))
server_info = piscsi_cmd.get_server_info()
process = file_cmd.rename_file(
Path(server_info["image_dir"]) / str(file_name),
Path(server_info["image_dir"]) / str(new_file_name),
)
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
return response(error=True, message=process["msg"])
@ -1224,7 +1235,12 @@ def copy():
safe_path = is_safe_path(new_file_name)
if not safe_path["status"]:
return response(error=True, message=safe_path["msg"])
process = file_cmd.copy_image(str(file_name), str(new_file_name))
server_info = piscsi_cmd.get_server_info()
process = file_cmd.copy_file(
Path(server_info["image_dir"]) / str(file_name),
Path(server_info["image_dir"]) / str(new_file_name),
)
process = ReturnCodeMapper.add_msg(process)
if not process["status"]:
return response(error=True, message=process["msg"])