diff --git a/easyinstall.sh b/easyinstall.sh index 6bfa87ca..70a9d430 100755 --- a/easyinstall.sh +++ b/easyinstall.sh @@ -52,6 +52,7 @@ VIRTUAL_DRIVER_PATH="$HOME/images" CFG_PATH="$HOME/.config/rascsi" WEB_INSTALL_PATH="$BASE/python/web" OLED_INSTALL_PATH="$BASE/python/oled" +CTRLBOARD_INSTALL_PATH="$BASE/python/ctrlboard" PYTHON_COMMON_PATH="$BASE/python/common" SYSTEMD_PATH="/etc/systemd/system" HFS_FORMAT=/usr/bin/hformat @@ -314,6 +315,17 @@ function stopRaScsiScreen() { fi } +# Stops the rascsi-ctrlboard service if it is running +function stopRaScsiCtrlBoard() { + if [[ -f "$SYSTEMD_PATH/rascsi-ctrlboard.service" ]]; then + SERVICE_RASCSI_CTRLBOARD_RUNNING=0 + sudo systemctl is-active --quiet rascsi-ctrlboard.service >/dev/null 2>&1 || SERVICE_RASCSI_CTRLBOARD_RUNNING=$? + if [[ SERVICE_RASCSI_CTRLBOARD_RUNNING -eq 0 ]]; then + sudo systemctl stop rascsi-ctrlboard.service + fi + fi +} + # disables and removes the old monitor_rascsi service function disableOldRaScsiMonitorService() { if [ -f "$SYSTEMD_PATH/monitor_rascsi.service" ]; then @@ -332,6 +344,40 @@ function disableOldRaScsiMonitorService() { fi } +# disables the rascsi-oled service +function disableRaScsiOledService() { + if [ -f "$SYSTEMD_PATH/rascsi-oled.service" ]; then + SERVICE_RASCSI_OLED_RUNNING=0 + sudo systemctl is-active --quiet rascsi-oled.service >/dev/null 2>&1 || SERVICE_RASCSI_OLED_RUNNING=$? + if [[ $SERVICE_RASCSI_OLED_RUNNING -eq 0 ]]; then + sudo systemctl stop rascsi-oled.service + fi + + SERVICE_RASCSI_OLED_ENABLED=0 + sudo systemctl is-enabled --quiet rascsi-oled.service >/dev/null 2>&1 || SERVICE_RASCSI_OLED_ENABLED=$? + if [[ $SERVICE_RASCSI_OLED_ENABLED -eq 0 ]]; then + sudo systemctl disable rascsi-oled.service + fi + fi +} + +# disables the rascsi-ctrlboard service +function disableRaScsiCtrlBoardService() { + if [ -f "$SYSTEMD_PATH/rascsi-ctrlboard.service" ]; then + SERVICE_RASCSI_CTRLBOARD_RUNNING=0 + sudo systemctl is-active --quiet rascsi-ctrlboard.service >/dev/null 2>&1 || SERVICE_RASCSI_CTRLBOARD_RUNNING=$? + if [[ $SERVICE_RASCSI_CTRLBOARD_RUNNING -eq 0 ]]; then + sudo systemctl stop rascsi-ctrlboard.service + fi + + SERVICE_RASCSI_CTRLBOARD_ENABLED=0 + sudo systemctl is-enabled --quiet rascsi-ctrlboard.service >/dev/null 2>&1 || SERVICE_RASCSI_CTRLBOARD_ENABLED=$? + if [[ $SERVICE_RASCSI_CTRLBOARD_ENABLED -eq 0 ]]; then + sudo systemctl disable rascsi-ctrlboard.service + fi + fi +} + # Stops the macproxy service if it is running function stopMacproxy() { if [ -f "$SYSTEMD_PATH/macproxy.service" ]; then @@ -339,7 +385,7 @@ function stopMacproxy() { fi } -# Starts the rascsi-oled service if installed +# Checks whether the rascsi-oled service is installed function isRaScsiScreenInstalled() { SERVICE_RASCSI_OLED_ENABLED=0 if [[ -f "$SYSTEMD_PATH/rascsi-oled.service" ]]; then @@ -353,7 +399,19 @@ function isRaScsiScreenInstalled() { echo $SERVICE_RASCSI_OLED_ENABLED } -# Starts the rascsi-oled service if installed +# Checks whether the rascsi-ctrlboard service is installed +function isRaScsiCtrlBoardInstalled() { + SERVICE_RASCSI_CTRLBOARD_ENABLED=0 + if [[ -f "$SYSTEMD_PATH/rascsi-ctrlboard.service" ]]; then + sudo systemctl is-enabled --quiet rascsi-ctrlboard.service >/dev/null 2>&1 || SERVICE_RASCSI_CTRLBOARD_ENABLED=$? + else + SERVICE_RASCSI_CTRLBOARD_ENABLED=1 + fi + + echo $SERVICE_RASCSI_CTRLBOARD_ENABLED +} + +# Checks whether the rascsi-oled service is running function isRaScsiScreenRunning() { SERVICE_RASCSI_OLED_RUNNING=0 if [[ -f "$SYSTEMD_PATH/rascsi-oled.service" ]]; then @@ -367,6 +425,19 @@ function isRaScsiScreenRunning() { echo $SERVICE_RASCSI_OLED_RUNNING } +# Checks whether the rascsi-oled service is running +function isRaScsiCtrlBoardRunning() { + SERVICE_RASCSI_CTRLBOARD_RUNNING=0 + if [[ -f "$SYSTEMD_PATH/rascsi-ctrlboard.service" ]]; then + sudo systemctl is-active --quiet rascsi-ctrlboard.service >/dev/null 2>&1 || SERVICE_RASCSI_CTRLBOARD_RUNNING=$? + else + SERVICE_RASCSI_CTRLBOARD_RUNNING=1 + fi + + echo $SERVICE_RASCSI_CTRLBOARD_RUNNING +} + + # Starts the rascsi-oled service if installed function startRaScsiScreen() { if [[ $(isRaScsiScreenInstalled) -eq 0 ]] && [[ $(isRaScsiScreenRunning) -ne 1 ]]; then @@ -375,6 +446,14 @@ function startRaScsiScreen() { fi } +# Starts the rascsi-ctrlboard service if installed +function startRaScsiCtrlBoard() { + if [[ $(isRaScsiCtrlBoardInstalled) -eq 0 ]] && [[ $(isRaScsiCtrlBoardRunning) -ne 1 ]]; then + sudo systemctl start rascsi-ctrlboard.service + showRaScsiCtrlBoardStatus + fi +} + # Starts the macproxy service if installed function startMacproxy() { if [ -f "$SYSTEMD_PATH/macproxy.service" ]; then @@ -398,6 +477,11 @@ function showRaScsiScreenStatus() { systemctl status rascsi-oled | tee } +# Shows status for the rascsi-ctrlboard service +function showRaScsiCtrlBoardStatus() { + systemctl status rascsi-ctrlboard | tee +} + # Shows status for the macproxy service function showMacproxyStatus() { systemctl status macproxy | tee @@ -828,6 +912,7 @@ function installRaScsiScreen() { fi stopRaScsiScreen + disableRaScsiCtrlBoardService updateRaScsiGit sudo apt-get update && sudo apt-get install libjpeg-dev libpng-dev libopenjp2-7-dev i2c-tools raspi-config -y = 0: + active_menu.item_selection -= 1 + else: + active_menu.item_selection = 0 + self._menu_controller.get_menu_renderer().render() + + def update_events(self): + """Method handling non-blocking event handling for the cycle buttons.""" + if self.rascsi_profile_cycler is not None: + result = self.rascsi_profile_cycler.update() + if result is not None: + self.rascsi_profile_cycler = None + self.context_stack = [] + self._menu_controller.segue(result) + if self.rascsi_shutdown_cycler is not None: + self.rascsi_shutdown_cycler.empty_messages = False + result = self.rascsi_shutdown_cycler.update() + if result == "return": + self.rascsi_shutdown_cycler = None + + def handle_button1(self): + """Method for handling the first cycle button (cycle profiles)""" + if self.rascsi_profile_cycler is None: + self.rascsi_profile_cycler = RascsiProfileCycler(self._menu_controller, self.sock_cmd, + self.ractl_cmd, return_entry=True) + else: + self.rascsi_profile_cycler.cycle() + + def handle_button2(self): + """Method for handling the second cycle button (cycle shutdown)""" + if self.rascsi_shutdown_cycler is None: + self.rascsi_shutdown_cycler = RascsiShutdownCycler(self._menu_controller, self.sock_cmd, + self.ractl_cmd) + else: + self.rascsi_shutdown_cycler.cycle() + + def route_rotary_button_handler(self, info_object): + """Method for handling the rotary button press for the menu navigation""" + if info_object is None: + return + + context = info_object["context"] + action = info_object["action"] + handler_function_name = "handle_" + context + "_" + action + try: + handler_function = getattr(self, handler_function_name) + if handler_function is not None: + handler_function(info_object) + except AttributeError: + log = logging.getLogger(__name__) + log.error("Handler function [%s] not found or returned an error. Skipping.", + str(handler_function_name)) + + # noinspection PyUnusedLocal + # pylint: disable=unused-argument + def handle_scsi_id_menu_openactionmenu(self, info_object): + """Method handles the rotary button press with the scsi list to open the action menu.""" + context_object = self._menu_controller.get_active_menu().get_current_info_object() + self.context_stack.append(context_object) + self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU, context_object=context_object, + transition_attributes=self._menu_renderer_config. + transition_attributes_left) + + # noinspection PyUnusedLocal + # pylint: disable=unused-argument + def handle_action_menu_return(self, info_object): + """Method handles the rotary button press to return from the + action menu to the scsi list.""" + self.context_stack.pop() + self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, + transition_attributes=self._menu_renderer_config. + transition_attributes_right) + + # noinspection PyUnusedLocal + # pylint: disable=unused-argument + def handle_action_menu_slot_attachinsert(self, info_object): + """Method handles the rotary button press on attach in the action menu.""" + context_object = self._menu_controller.get_active_menu().context_object + self.context_stack.append(context_object) + self._menu_controller.segue(CtrlBoardMenuBuilder.IMAGES_MENU, context_object=context_object, + transition_attributes=self._menu_renderer_config. + transition_attributes_left) + + # noinspection PyUnusedLocal + def handle_action_menu_slot_detacheject(self, info_object): + """Method handles the rotary button press on detach in the action menu.""" + context_object = self._menu_controller.get_active_menu().context_object + self.detach_eject_scsi_id() + self.context_stack = [] + self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, + context_object=context_object, + transition_attributes=self._menu_renderer_config. + transition_attributes_right) + + # noinspection PyUnusedLocal + def handle_action_menu_slot_info(self, info_object): + """Method handles the rotary button press on 'Info' in the action menu.""" + context_object = self._menu_controller.get_active_menu().context_object + self.context_stack.append(context_object) + self._menu_controller.segue(CtrlBoardMenuBuilder.DEVICEINFO_MENU, + transition_attributes=self._menu_renderer_config. + transition_attributes_left, + context_object=context_object) + + # noinspection PyUnusedLocal + def handle_device_info_menu_return(self, info_object): + """Method handles the rotary button press on 'Return' in the info menu.""" + self.context_stack.pop() + context_object = self._menu_controller.get_active_menu().context_object + self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU, + transition_attributes=self._menu_renderer_config. + transition_attributes_right, + context_object=context_object) + + # noinspection PyUnusedLocal + def handle_action_menu_loadprofile(self, info_object): + """Method handles the rotary button press on 'Load Profile' in the action menu.""" + context_object = self._menu_controller.get_active_menu().context_object + self.context_stack.append(context_object) + self._menu_controller.segue(CtrlBoardMenuBuilder.PROFILES_MENU, + transition_attributes=self._menu_renderer_config. + transition_attributes_left) + + # noinspection PyUnusedLocal + def handle_profiles_menu_loadprofile(self, info_object): + """Method handles the rotary button press in the profile selection menu + for selecting a profile to load.""" + if info_object is not None and "name" in info_object: + file_cmd = FileCmds(sock_cmd=self.sock_cmd, ractl=self.ractl_cmd) + result = file_cmd.read_config(file_name=info_object["name"]) + if result["status"] is True: + self._menu_controller.show_message("Profile loaded!") + else: + self._menu_controller.show_message("Loading failed!") + + self.context_stack = [] + self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, + transition_attributes=self._menu_renderer_config. + transition_attributes_right) + + # noinspection PyUnusedLocal + def handle_action_menu_shutdown(self, info_object): + """Method handles the rotary button press on 'Shutdown' in the action menu.""" + self.ractl_cmd.shutdown_pi("system") + self._menu_controller.show_message("Shutting down!", 150) + self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, + transition_attributes=self._menu_renderer_config. + transition_attributes_right) + + # noinspection PyUnusedLocal + def handle_images_menu_return(self, info_object): + """Method handles the rotary button press on 'Return' in the image selection menu + (through attach/insert).""" + context_object = self.context_stack.pop() + self._menu_controller.segue(CtrlBoardMenuBuilder.ACTION_MENU, + context_object=context_object, + transition_attributes=self._menu_renderer_config. + transition_attributes_right) + + def handle_images_menu_image_attachinsert(self, info_object): + """Method handles the rotary button press on an image in the image selection menu + (through attach/insert)""" + context_object = self._menu_controller.get_active_menu().context_object + self.attach_insert_scsi_id(info_object) + self.context_stack = [] + self._menu_controller.segue(CtrlBoardMenuBuilder.SCSI_ID_MENU, + context_object=context_object, + transition_attributes=self._menu_renderer_config. + transition_attributes_right) + + def attach_insert_scsi_id(self, info_object): + """Helper method to attach/insert an image on a scsi id given through the menu context""" + image_name = info_object["name"] + device_type = info_object["device_type"] + context_object = self._menu_controller.get_active_menu().context_object + scsi_id = context_object["scsi_id"] + params = {"file": image_name} + result = self.ractl_cmd.attach_device(scsi_id=scsi_id, + device_type=device_type, + params=params) + + if result["status"] is False: + self._menu_controller.show_message("Attach failed!") + else: + self.show_id_action_message(scsi_id, "attached") + + def detach_eject_scsi_id(self): + """Helper method to detach/eject an image on a scsi id given through the menu context""" + context_object = self._menu_controller.get_active_menu().context_object + scsi_id = context_object["scsi_id"] + device_info = self.ractl_cmd.list_devices(scsi_id) + + if not device_info["device_list"]: + return + + device_type = device_info["device_list"][0]["device_type"] + image = device_info["device_list"][0]["image"] + if device_type in ("SAHD", "SCHD", "SCBR", "SCDP", "SCLP", "SCHS"): + result = self.ractl_cmd.detach_by_id(scsi_id) + if result["status"] is True: + self.show_id_action_message(scsi_id, "detached") + else: + self._menu_controller.show_message("Detach failed!") + elif device_type in ("SCRM", "SCMO", "SCCD"): + if image: + result = self.ractl_cmd.eject_by_id(scsi_id) + if result["status"] is True: + self.show_id_action_message(scsi_id, "ejected") + else: + self._menu_controller.show_message("Eject failed!") + else: + result = self.ractl_cmd.detach_by_id(scsi_id) + if result["status"] is True: + self.show_id_action_message(scsi_id, "detached") + else: + self._menu_controller.show_message("Detach failed!") + else: + log = logging.getLogger(__name__) + log.info("Device type '%s' currently unsupported for detach/eject!", str(device_type)) + + def show_id_action_message(self, scsi_id, action: str): + """Helper method for displaying an action message in the case of an exception.""" + self._menu_controller.show_message("ID " + str(scsi_id) + " " + action + "!") diff --git a/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_print_event_handler.py b/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_print_event_handler.py new file mode 100644 index 00000000..5992d383 --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_event_handler/ctrlboard_print_event_handler.py @@ -0,0 +1,15 @@ +"""Module for test printing events when buttons from the RaSCSI Control Board are pressed""" +import observer +from ctrlboard_hw.hardware_button import HardwareButton +from ctrlboard_hw.encoder import Encoder + + +# pylint: disable=too-few-public-methods +class CtrlBoardPrintEventHandler(observer.Observer): + """Class implements a basic event handler that prints button presses from the RaSCSI + Control Board hardware.""" + def update(self, updated_object): + if isinstance(updated_object, HardwareButton): + print(updated_object.name + " has been pressed!") + if isinstance(updated_object, Encoder): + print(updated_object.pos) diff --git a/python/ctrlboard/src/ctrlboard_event_handler/rascsi_profile_cycler.py b/python/ctrlboard/src/ctrlboard_event_handler/rascsi_profile_cycler.py new file mode 100644 index 00000000..581e99d5 --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_event_handler/rascsi_profile_cycler.py @@ -0,0 +1,24 @@ +"""Module providing the profile cycler class for the RaSCSI Control Board UI""" +from ctrlboard_menu_builder import CtrlBoardMenuBuilder +from menu.cycler import Cycler + + +class RascsiProfileCycler(Cycler): + """Class implementing the profile cycler for the RaSCSI Control Baord UI""" + + def populate_cycle_entries(self): + cycle_entries = self.file_cmd.list_config_files() + + return cycle_entries + + def perform_selected_entry_action(self, selected_entry): + result = self.file_cmd.read_config(selected_entry) + self._menu_controller.show_timed_mini_message("") + if result["status"] is True: + return CtrlBoardMenuBuilder.SCSI_ID_MENU + + self._menu_controller.show_message("Failed!") + return CtrlBoardMenuBuilder.SCSI_ID_MENU + + def perform_return_action(self): + return CtrlBoardMenuBuilder.SCSI_ID_MENU diff --git a/python/ctrlboard/src/ctrlboard_event_handler/rascsi_shutdown_cycler.py b/python/ctrlboard/src/ctrlboard_event_handler/rascsi_shutdown_cycler.py new file mode 100644 index 00000000..3cd90b27 --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_event_handler/rascsi_shutdown_cycler.py @@ -0,0 +1,31 @@ +"""Module providing the shutdown cycler for the RaSCSI Control Board UI """ +from menu.cycler import Cycler + + +class RascsiShutdownCycler(Cycler): + """Class implementing the shutdown cycler for the RaSCSI Control Board UI""" + + def __init__(self, menu_controller, sock_cmd, ractl_cmd): + super().__init__(menu_controller, sock_cmd, ractl_cmd, return_entry=True, + empty_messages=False) + self.executed_once = False + + def populate_cycle_entries(self): + cycle_entries = ["Shutdown"] + + return cycle_entries + + def perform_selected_entry_action(self, selected_entry): + if self.executed_once is False: + self.executed_once = True + self._menu_controller.show_timed_message("Shutting down...") + self.ractl_cmd.shutdown_pi("system") + return "shutdown" + + return None + + def perform_return_action(self): + self._menu_controller.show_timed_mini_message("") + self._menu_controller.show_timed_message("") + + return "return" diff --git a/python/ctrlboard/src/ctrlboard_hw/__init__.py b/python/ctrlboard/src/ctrlboard_hw/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw.py b/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw.py new file mode 100644 index 00000000..6600d7f0 --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw.py @@ -0,0 +1,227 @@ +"""Module providing the interface to the RaSCSI Control Board hardware""" +# noinspection PyUnresolvedReferences +import logging +import RPi.GPIO as GPIO +import numpy +import smbus + +from ctrlboard_hw import pca9554multiplexer +from ctrlboard_hw.hardware_button import HardwareButton +from ctrlboard_hw.ctrlboard_hw_constants import CtrlBoardHardwareConstants +from ctrlboard_hw.encoder import Encoder +from ctrlboard_hw.pca9554multiplexer import PCA9554Multiplexer +from observable import Observable + + +# pylint: disable=too-many-instance-attributes +class CtrlBoardHardware(Observable): + """Class implements the RaSCSI Control Board hardware and provides an interface to it.""" + def __init__(self, display_i2c_address, pca9554_i2c_address): + self.display_i2c_address = display_i2c_address + self.pca9554_i2c_address = pca9554_i2c_address + self.rascsi_controlboard_detected = self.detect_rascsi_controlboard() + log = logging.getLogger(__name__) + log.info("RaSCSI Control Board detected: %s", str(self.rascsi_controlboard_detected)) + self.display_detected = self.detect_display() + log.info("Display detected: %s", str(self.display_detected)) + + if self.rascsi_controlboard_detected is False: + return + + self.pos = 0 + self.pca_driver = pca9554multiplexer.PCA9554Multiplexer(self.pca9554_i2c_address) + + # setup pca9554 + self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. + PCA9554_PIN_ENC_A, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) + self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. + PCA9554_PIN_ENC_B, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) + self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. + PCA9554_PIN_BUTTON_1, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) + self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. + PCA9554_PIN_BUTTON_2, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) + self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. + PCA9554_PIN_BUTTON_ROTARY, + PCA9554Multiplexer.PIN_ENABLED_AS_INPUT) + self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. + PCA9554_PIN_LED_1, + PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT) + self.pca_driver.write_configuration_register_port(CtrlBoardHardwareConstants. + PCA9554_PIN_LED_2, + PCA9554Multiplexer.PIN_ENABLED_AS_OUTPUT) + self.input_register_buffer = numpy.uint32(0) + + # pylint: disable=no-member + GPIO.setmode(GPIO.BCM) + GPIO.setup(CtrlBoardHardwareConstants.PI_PIN_INTERRUPT, GPIO.IN) + GPIO.add_event_detect(CtrlBoardHardwareConstants.PI_PIN_INTERRUPT, GPIO.FALLING, + callback=self.button_pressed_callback) + + # configure button of the rotary encoder + self.rotary_button = HardwareButton(self.pca_driver, + CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_ROTARY) + self.rotary_button.state = True + self.rotary_button.name = CtrlBoardHardwareConstants.ROTARY_BUTTON + + # configure button 1 + self.button1 = HardwareButton(self.pca_driver, + CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_1) + self.button1.state = True + self.button1.name = CtrlBoardHardwareConstants.BUTTON_1 + + # configure button 2 + self.button2 = HardwareButton(self.pca_driver, + CtrlBoardHardwareConstants.PCA9554_PIN_BUTTON_2) + self.button2.state = True + self.button2.name = CtrlBoardHardwareConstants.BUTTON_2 + + # configure rotary encoder pin a + self.rotary_a = HardwareButton(self.pca_driver, + CtrlBoardHardwareConstants.PCA9554_PIN_ENC_A) + self.rotary_a.state = True + self.rotary_a.directionalTransition = False + self.rotary_a.name = CtrlBoardHardwareConstants.ROTARY_A + + # configure rotary encoder pin b + self.rotary_b = HardwareButton(self.pca_driver, + CtrlBoardHardwareConstants.PCA9554_PIN_ENC_B) + self.rotary_b.state = True + self.rotary_b.directionalTransition = False + self.rotary_b.name = CtrlBoardHardwareConstants.ROTARY_B + + # configure encoder object + self.rotary = Encoder(self.rotary_a, self.rotary_b) + self.rotary.pos_prev = 0 + self.rotary.name = CtrlBoardHardwareConstants.ROTARY + + # noinspection PyUnusedLocal + # pylint: disable=unused-argument + def button_pressed_callback(self, channel): + """Method is called when a button is pressed and reads the corresponding input register.""" + input_register = self.pca_driver.read_input_register() + self.input_register_buffer <<= 8 + self.input_register_buffer |= input_register + + def check_button_press(self, button): + """Checks whether the button state has changed.""" + if button.state_interrupt is True: + return + + value = button.state_interrupt + + if value != button.state and value is False: + button.state = False + self.notify(button) + button.state = True + button.state_interrupt = True + + def check_rotary_encoder(self, rotary): + """Checks whether the rotary state has changed.""" + rotary.update() + if self.rotary.pos_prev != self.rotary.pos: + self.notify(rotary) + self.rotary.pos_prev = self.rotary.pos + + # noinspection PyMethodMayBeStatic + @staticmethod + def button_value(input_register_buffer, bit): + """Method reads the button value from a specific bit in the input register.""" + tmp = input_register_buffer + bitmask = 1 << bit + tmp &= bitmask + tmp >>= bit + return tmp + + # noinspection PyMethodMayBeStatic + @staticmethod + def button_value_shifted_list(input_register_buffer, bit): + """Helper method for dealing with multiple buffered input registers""" + input_register_buffer_length = int(len(format(input_register_buffer, 'b'))/8) + shiftval = (input_register_buffer_length-1)*8 + tmp = input_register_buffer >> shiftval + bitmask = 1 << bit + tmp &= bitmask + tmp >>= bit + return tmp + + def process_events(self): + """Non-blocking event processor for hardware events (button presses etc.)""" + input_register_buffer_length = int(len(format(self.input_register_buffer, 'b'))/8) + if input_register_buffer_length <= 1: + return + + input_register_buffer = self.input_register_buffer + self.input_register_buffer = 0 + + for i in range(0, input_register_buffer_length): + shiftval = (input_register_buffer_length-1-i)*8 + input_register = (input_register_buffer >> shiftval) & 0b11111111 + + rot_a = self.button_value(input_register, 0) + rot_b = self.button_value(input_register, 1) + button_rotary = self.button_value(input_register, 5) + button_1 = self.button_value(input_register, 2) + button_2 = self.button_value(input_register, 3) + + if button_1 == 0: + self.button1.state_interrupt = bool(button_1) + + if button_2 == 0: + self.button2.state_interrupt = bool(button_2) + + if button_rotary == 0: + self.rotary_button.state_interrupt = bool(button_rotary) + + if rot_a == 0: + self.rotary.enc_a.state_interrupt = bool(rot_a) + + if rot_b == 0: + self.rotary.enc_b.state_interrupt = bool(rot_b) + + self.check_button_press(self.rotary_button) + self.check_button_press(self.button1) + self.check_button_press(self.button2) + self.check_rotary_encoder(self.rotary) + + self.rotary.state = 0b11 + self.input_register_buffer = 0 + + @staticmethod + def detect_i2c_devices(_bus): + """Method finds addresses on the i2c bus""" + detected_i2c_addresses = [] + + for _address in range(128): + if 2 < _address < 120: + try: + _bus.read_byte(_address) + address = '%02x' % _address + detected_i2c_addresses.append(int(address, base=16)) + except IOError: # simply skip unsuccessful i2c probes + pass + + return detected_i2c_addresses + + def detect_rascsi_controlboard(self): + """Detects whether the RaSCSI Control Board is attached by checking whether + the expected i2c addresses are detected.""" + # pylint: disable=c-extension-no-member + i2c_addresses = self.detect_i2c_devices(smbus.SMBus(1)) + return bool((int(self.display_i2c_address) in i2c_addresses and + (int(self.pca9554_i2c_address) in i2c_addresses))) + + def detect_display(self): + """Detects whether an i2c display is connected to the RaSCSI hat.""" + # pylint: disable=c-extension-no-member + i2c_addresses = self.detect_i2c_devices(smbus.SMBus(1)) + return bool(int(self.display_i2c_address) in i2c_addresses) + + @staticmethod + def cleanup(): + """Resets pin and interrupt settings on the pins.""" + # pylint: disable=no-member + GPIO.cleanup() diff --git a/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw_constants.py b/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw_constants.py new file mode 100644 index 00000000..d1c4606e --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_hw/ctrlboard_hw_constants.py @@ -0,0 +1,24 @@ +"""Module containing the RaSCSI Control Board hardware constants""" + + +# pylint: disable=too-few-public-methods +class CtrlBoardHardwareConstants: + """Class containing the RaSCSI Control Board hardware constants""" + DISPLAY_I2C_ADDRESS = 0x3c + PCA9554_I2C_ADDRESS = 0x3f + PCA9554_PIN_ENC_A = 0 + PCA9554_PIN_ENC_B = 1 + PCA9554_PIN_BUTTON_1 = 2 + PCA9554_PIN_BUTTON_2 = 3 + PCA9554_PIN_BUTTON_ROTARY = 5 + PCA9554_PIN_LED_1 = 6 + PCA9554_PIN_LED_2 = 7 + + PI_PIN_INTERRUPT = 9 # BCM + + BUTTON_1 = "Bt1" + BUTTON_2 = "Bt2" + ROTARY_A = "RotA" + ROTARY_B = "RotB" + ROTARY_BUTTON = "RotBtn" + ROTARY = "Rot" diff --git a/python/ctrlboard/src/ctrlboard_hw/encoder.py b/python/ctrlboard/src/ctrlboard_hw/encoder.py new file mode 100644 index 00000000..0af34d7a --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_hw/encoder.py @@ -0,0 +1,66 @@ +"""Module containing an implementation for reading the rotary encoder directions through +the i2c multiplexer + interrupt""" +from ctrlboard_hw.hardware_button import HardwareButton + + +class Encoder: + """Class implementing a detection mechanism to detect the rotary encoder directions + through the i2c multiplexer + interrupt""" + def __init__(self, enc_a: HardwareButton, enc_b: HardwareButton): + self.enc_a = enc_a + self.enc_b = enc_b + self.pos = 0 + self.state = 0b0011 + self.direction = 0 + + def update(self): + """Updates the internal attributes wrt. to the encoder position and direction.""" + self.update2() + + def update2(self): + """Primary method for detecting the direction""" + value_enc_a = self.enc_a.state_interrupt + value_enc_b = self.enc_b.state_interrupt + + self.direction = 0 + state = self.state & 0b0011 + + if value_enc_a: + state |= 0b0100 + if value_enc_b: + state |= 0b1000 + + if state == 0b1011: + self.pos += 1 + self.direction = 1 + + if state == 0b0111: + self.pos -= 1 + self.direction = -1 + + self.state = state >> 2 + + self.enc_a.state_interrupt = True + self.enc_b.state_interrupt = True + + def update1(self): + """Secondary, less well working method to detect the direction""" + if self.enc_a.state_interrupt is True and self.enc_b.state_interrupt is True: + return + + if self.enc_a.state_interrupt is False and self.enc_b.state_interrupt is False: + self.enc_a.state_interrupt = True + self.enc_b.state_interrupt = True + return + + self.direction = 0 + + if self.enc_a.state_interrupt is False: + self.pos += 1 + self.direction = 1 + elif self.enc_a.state_interrupt is True: + self.pos -= 1 + self.direction = -1 + + self.enc_a.state_interrupt = True + self.enc_b.state_interrupt = True diff --git a/python/ctrlboard/src/ctrlboard_hw/hardware_button.py b/python/ctrlboard/src/ctrlboard_hw/hardware_button.py new file mode 100644 index 00000000..43268f68 --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_hw/hardware_button.py @@ -0,0 +1,17 @@ +"""Module containing an abstraction for the hardware button through the i2c multiplexer""" + + +# pylint: disable=too-few-public-methods +class HardwareButton: + """Class implementing a hardware button interface that uses the i2c multiplexer""" + + def __init__(self, pca_driver, pin): + self.pca_driver = pca_driver + self.pin = pin + self.state = True + self.state_interrupt = True + self.name = "n/a" + + def read(self): + """Reads the configured port of the i2c multiplexer""" + return self.pca_driver.read_input_register_port(self.pin) diff --git a/python/ctrlboard/src/ctrlboard_hw/pca9554multiplexer.py b/python/ctrlboard/src/ctrlboard_hw/pca9554multiplexer.py new file mode 100644 index 00000000..bf25fad6 --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_hw/pca9554multiplexer.py @@ -0,0 +1,64 @@ +""" +Module for interfacting with the pca9554 multiplexer +""" +# pylint: disable=c-extension-no-member +import logging +import smbus + + +class PCA9554Multiplexer: + """Class interfacing with the pca9554 multiplexer""" + + PIN_ENABLED_AS_OUTPUT = 0 + PIN_ENABLED_AS_INPUT = 1 + + def __init__(self, i2c_address): + """Constructor for the pc9554 multiplexer interface class""" + self.i2c_address = i2c_address + + try: + self.i2c_bus = smbus.SMBus(1) + if self.read_input_register() is None: + logging.error("PCA9554 initialization test on specified i2c address %s failed", + self.i2c_address) + self.i2c_bus = None + except IOError: + logging.error("Could not open the i2c bus.") + self.i2c_bus = None + + def write_configuration_register_port(self, port_bit, bit_value): + """Reconfigures the configuration register. Updates the specified + port bit with bit_value. Returns true if successful, false otherwise.""" + try: + if (0 <= port_bit <= 8) and (0 <= bit_value <= 1): + configuration_register = self.i2c_bus.read_byte_data(self.i2c_address, 3) + if bit_value: + updated_configuration_register = configuration_register | (1 << port_bit) + else: + updated_configuration_register = configuration_register & (0xFF - + (1 << port_bit)) + self.i2c_bus.write_byte_data(self.i2c_address, 3, updated_configuration_register) + return True + + return False + except IOError: + return False + + def read_input_register(self): + """Reads the complete 8 bit input port register from pca9554""" + try: + return self.i2c_bus.read_byte_data(self.i2c_address, 0) + except IOError: + return None + + def read_input_register_port(self, port_bit): + """Reads the input port register and returns the logic level of one specific port in the + argument""" + try: + if 0 <= port_bit <= 8: + input_register = self.i2c_bus.read_byte_data(self.i2c_address, 0) + return (input_register >> port_bit) & 1 + + return None + except IOError: + return None diff --git a/python/ctrlboard/src/ctrlboard_menu_builder.py b/python/ctrlboard/src/ctrlboard_menu_builder.py new file mode 100644 index 00000000..160eb2bd --- /dev/null +++ b/python/ctrlboard/src/ctrlboard_menu_builder.py @@ -0,0 +1,185 @@ +"""Module for building the control board UI specific menus""" +import logging + +from menu.menu import Menu +from menu.menu_builder import MenuBuilder +from rascsi.file_cmds import FileCmds +from rascsi.ractl_cmds import RaCtlCmds + + +class CtrlBoardMenuBuilder(MenuBuilder): + """Class fgor building the control board UI specific menus""" + SCSI_ID_MENU = "scsi_id_menu" + ACTION_MENU = "action_menu" + IMAGES_MENU = "images_menu" + PROFILES_MENU = "profiles_menu" + DEVICEINFO_MENU = "device_info_menu" + + ACTION_OPENACTIONMENU = "openactionmenu" + ACTION_RETURN = "return" + ACTION_SLOT_ATTACHINSERT = "slot_attachinsert" + ACTION_SLOT_DETACHEJECT = "slot_detacheject" + ACTION_SLOT_INFO = "slot_info" + ACTION_SHUTDOWN = "shutdown" + ACTION_LOADPROFILE = "loadprofile" + ACTION_IMAGE_ATTACHINSERT = "image_attachinsert" + + def __init__(self, ractl_cmd: RaCtlCmds): + super().__init__() + self._rascsi_client = ractl_cmd + self.file_cmd = FileCmds(sock_cmd=ractl_cmd.sock_cmd, ractl=ractl_cmd, + token=ractl_cmd.token, locale=ractl_cmd.locale) + + def build(self, name: str, context_object=None) -> Menu: + if name == CtrlBoardMenuBuilder.SCSI_ID_MENU: + return self.create_scsi_id_list_menu(context_object) + if name == CtrlBoardMenuBuilder.ACTION_MENU: + return self.create_action_menu(context_object) + if name == CtrlBoardMenuBuilder.IMAGES_MENU: + return self.create_images_menu(context_object) + if name == CtrlBoardMenuBuilder.PROFILES_MENU: + return self.create_profiles_menu(context_object) + if name == CtrlBoardMenuBuilder.DEVICEINFO_MENU: + return self.create_device_info_menu(context_object) + + log = logging.getLogger(__name__) + log.warning("Provided menu name [%s] cannot be built!", name) + + return self.create_scsi_id_list_menu(context_object) + + # pylint: disable=unused-argument + def create_scsi_id_list_menu(self, context_object=None): + """Method creates the menu displaying the 7 scsi slots""" + devices = self._rascsi_client.list_devices() + reserved_ids = self._rascsi_client.get_reserved_ids() + + devices_by_id = {} + for device in devices["device_list"]: + devices_by_id[int(device["id"])] = device + + menu = Menu(CtrlBoardMenuBuilder.SCSI_ID_MENU) + + if reserved_ids["status"] is False: + menu.add_entry("No scsi ids reserved") + + for scsi_id in range(0, 8): + device = None + if devices_by_id.get(scsi_id) is not None: + device = devices_by_id[scsi_id] + file = "-" + device_type = "" + + if str(scsi_id) in reserved_ids["ids"]: + file = "[Reserved]" + elif device is not None: + file = str(device["file"]) + device_type = str(device["device_type"]) + + menu_str = str(scsi_id) + ":" + + if device_type == "SCDP": + menu_str += "Daynaport" + elif device_type == "SCBR": + menu_str += "X68000 Host Bridge" + elif device_type == "SCLP": + menu_str += "SCSI Printer" + elif device_type == "SCHS": + menu_str += "Host Services" + else: + if file == "": + menu_str += "(empty)" + else: + menu_str += file + if device_type != "": + menu_str += " [" + device_type + "]" + + menu.add_entry(menu_str, {"context": self.SCSI_ID_MENU, + "action": self.ACTION_OPENACTIONMENU, + "scsi_id": scsi_id}) + + return menu + + # noinspection PyMethodMayBeStatic + def create_action_menu(self, context_object=None): + """Method creates the action submenu with action that can be performed on a scsi slot""" + menu = Menu(CtrlBoardMenuBuilder.ACTION_MENU) + menu.add_entry("Return", {"context": self.ACTION_MENU, + "action": self.ACTION_RETURN}) + menu.add_entry("Attach/Insert", {"context": self.ACTION_MENU, + "action": self.ACTION_SLOT_ATTACHINSERT}) + menu.add_entry("Detach/Eject", {"context": self.ACTION_MENU, + "action": self.ACTION_SLOT_DETACHEJECT}) + menu.add_entry("Info", {"context": self.ACTION_MENU, + "action": self.ACTION_SLOT_INFO}) + menu.add_entry("Load Profile", {"context": self.ACTION_MENU, + "action": self.ACTION_LOADPROFILE}) + menu.add_entry("Shutdown", {"context": self.ACTION_MENU, + "action": self.ACTION_SHUTDOWN}) + return menu + + def create_images_menu(self, context_object=None): + """Creates a sub menu showing all the available images""" + menu = Menu(CtrlBoardMenuBuilder.IMAGES_MENU) + images_info = self.file_cmd.list_images() + menu.add_entry("Return", {"context": self.IMAGES_MENU, "action": self.ACTION_RETURN}) + images = images_info["files"] + sorted_images = sorted(images, key=lambda d: d['name']) + for image in sorted_images: + image_str = image["name"] + " [" + image["detected_type"] + "]" + image_context = {"context": self.IMAGES_MENU, "name": str(image["name"]), + "device_type": str(image["detected_type"]), + "action": self.ACTION_IMAGE_ATTACHINSERT} + menu.add_entry(image_str, image_context) + return menu + + def create_profiles_menu(self, context_object=None): + """Creates a sub menu showing all the available profiles""" + menu = Menu(CtrlBoardMenuBuilder.PROFILES_MENU) + menu.add_entry("Return", {"context": self.IMAGES_MENU, "action": self.ACTION_RETURN}) + config_files = self.file_cmd.list_config_files() + for config_file in config_files: + menu.add_entry(str(config_file), + {"context": self.PROFILES_MENU, "name": str(config_file), + "action": self.ACTION_LOADPROFILE}) + + return menu + + def create_device_info_menu(self, context_object=None): + """Create a menu displaying information of an image in a scsi slot""" + menu = Menu(CtrlBoardMenuBuilder.DEVICEINFO_MENU) + menu.add_entry("Return", {"context": self.DEVICEINFO_MENU, "action": self.ACTION_RETURN}) + + device_info = self._rascsi_client.list_devices(context_object["scsi_id"]) + + if not device_info["device_list"]: + return menu + + scsi_id = context_object["scsi_id"] + file = device_info["device_list"][0]["file"] + status = device_info["device_list"][0]["status"] + if not status: + status = "Read/Write" + lun = device_info["device_list"][0]["unit"] + device_type = device_info["device_list"][0]["device_type"] + if "parameters" in device_info["device_list"][0]: + parameters = device_info["device_list"][0]["parameters"] + else: + parameters = "{}" + + menu.add_entry("ID : " + str(scsi_id)) + menu.add_entry("LUN : " + str(lun)) + menu.add_entry("File : " + str(file)) + menu.add_entry("Type : " + str(device_type)) + menu.add_entry("R/RW : " + str(status)) + menu.add_entry("Prms : " + str(parameters)) + menu.add_entry("Vndr : " + str(device_info["device_list"][0]["vendor"])) + menu.add_entry("Prdct: " + str(device_info["device_list"][0]["product"])) + menu.add_entry("Rvisn: " + str(device_info["device_list"][0]["revision"])) + menu.add_entry("Blksz: " + str(device_info["device_list"][0]["block_size"])) + menu.add_entry("Imgsz: " + str(device_info["device_list"][0]["size"])) + + return menu + + def get_rascsi_client(self): + """Returns an instance of the rascsi client""" + return self._rascsi_client diff --git a/python/ctrlboard/src/main.py b/python/ctrlboard/src/main.py new file mode 100644 index 00000000..403a7923 --- /dev/null +++ b/python/ctrlboard/src/main.py @@ -0,0 +1,188 @@ +"""Module is the entry point for the RaSCSI Control Board UI""" +import argparse +import sys +import logging + +from config import CtrlboardConfig +from ctrlboard_hw.ctrlboard_hw import CtrlBoardHardware +from ctrlboard_hw.ctrlboard_hw_constants import CtrlBoardHardwareConstants +from ctrlboard_event_handler.ctrlboard_menu_update_event_handler \ + import CtrlBoardMenuUpdateEventHandler +from ctrlboard_menu_builder import CtrlBoardMenuBuilder +from menu.menu_renderer_config import MenuRendererConfig +from menu.menu_renderer_luma_oled import MenuRendererLumaOled +from rascsi.exceptions import (EmptySocketChunkException, + InvalidProtobufResponse, + FailedSocketConnectionException) +from rascsi.ractl_cmds import RaCtlCmds +from rascsi.socket_cmds import SocketCmds + +from rascsi_menu_controller import RascsiMenuController + + +def parse_config(): + """Parses the command line parameters and configured the RaSCSI Control Board UI accordingly""" + config = CtrlboardConfig() + cmdline_args_parser = argparse.ArgumentParser(description='RaSCSI ctrlboard service') + cmdline_args_parser.add_argument( + "--rotation", + type=int, + choices=[0, 90, 180, 270], + default=180, + action="store", + help="The rotation of the screen buffer in degrees. Default: 180", + ) + cmdline_args_parser.add_argument( + "--height", + type=int, + choices=[64], + default=64, + action="store", + help="The pixel height of the screen buffer. Default: 64", + ) + cmdline_args_parser.add_argument( + "--rascsi-host", + type=str, + default="localhost", + action="store", + help="RaSCSI host. Default: localhost", + ) + cmdline_args_parser.add_argument( + "--rascsi-port", + type=int, + default=6868, + action="store", + help="RaSCSI port. Default: 6868", + ) + cmdline_args_parser.add_argument( + "--password", + type=str, + default="", + action="store", + help="Token password string for authenticating with RaSCSI", + ) + cmdline_args_parser.add_argument( + "--loglevel", + type=int, + choices=[0, 10, 30, 40, 50], + default=logging.WARN, + action="store", + help="Loglevel. Valid values: 0 (notset), 10 (debug), 30 (warning), " + "40 (error), 50 (critical). Default: Warning", + ) + cmdline_args_parser.add_argument( + "--transitions", + type=int, + choices=[0, 1], + default=1, + action="store", + help="Transition animations. Valid values: 0 (disabled), 1 (enabled). Default: 1", + ) + args = cmdline_args_parser.parse_args() + config.ROTATION = args.rotation + + if args.height == 64: + config.HEIGHT = 64 + config.LINES = 8 + elif args.height == 32: + config.HEIGHT = 32 + config.LINES = 4 + config.TOKEN = args.password + config.WIDTH = 128 + config.BORDER = 5 + config.RASCSI_HOST = args.rascsi_host + config.RASCSI_PORT = args.rascsi_port + config.LOG_LEVEL = args.loglevel + config.TRANSITIONS = bool(args.transitions) + + return config + + +def check_rascsi_connection(ractl_cmd): + """Checks whether a RaSCSI connection exists by polling the RaSCSI server info. + Returns true if connection works, false if connection fails.""" + try: + info = ractl_cmd.get_reserved_ids() + return bool(info["status"] is True) + except FailedSocketConnectionException: + log = logging.getLogger(__name__) + log.error("Could not establish connection. Stopping service") + exit(1) + + +def main(): + """Main function for the RaSCSI Control Board UI""" + config = parse_config() + + log_format = "%(asctime)s:%(name)s:%(levelname)s - %(message)s" + logging.basicConfig(stream=sys.stdout, + format=log_format, + level=config.LOG_LEVEL) + log = logging.getLogger(__name__) + log.debug("RaSCSI ctrlboard service started.") + + ctrlboard_hw = CtrlBoardHardware(display_i2c_address=config.DISPLAY_I2C_ADDRESS, + pca9554_i2c_address=config.PCA9554_I2C_ADDRESS) + + # for now, we require the complete rascsi ctrlboard hardware. + # Oled only will be supported as well at some later point in time. + if ctrlboard_hw.rascsi_controlboard_detected is False: + log.error("Ctrlboard hardware not detected. Stopping service") + exit(1) + + sock_cmd = None + ractl_cmd = None + try: + sock_cmd = SocketCmds(host=config.RASCSI_HOST, port=config.RASCSI_PORT) + ractl_cmd = RaCtlCmds(sock_cmd=sock_cmd, token=config.TOKEN) + except EmptySocketChunkException: + log.error("Retrieved empty data from RaSCSI. Stopping service") + exit(1) + except InvalidProtobufResponse: + log.error("Retrieved unexpected data from RaSCSI. Stopping service") + exit(1) + + if check_rascsi_connection(ractl_cmd) is False: + log.error("Communication with RaSCSI failed. Please check if password token must be set " + "and whether is set correctly.") + exit(1) + + menu_renderer_config = MenuRendererConfig() + + if config.TRANSITIONS is False: + menu_renderer_config.transition = None + + menu_renderer_config.i2c_address = CtrlBoardHardwareConstants.DISPLAY_I2C_ADDRESS + menu_renderer_config.rotation = config.ROTATION + + menu_builder = CtrlBoardMenuBuilder(ractl_cmd) + menu_controller = RascsiMenuController(config.MENU_REFRESH_INTERVAL, menu_builder=menu_builder, + menu_renderer=MenuRendererLumaOled(menu_renderer_config), + menu_renderer_config=menu_renderer_config) + + menu_controller.add(CtrlBoardMenuBuilder.SCSI_ID_MENU) + menu_controller.add(CtrlBoardMenuBuilder.ACTION_MENU) + + menu_controller.show_splash_screen(f"resources/splash_start_64.bmp") + + menu_update_event_handler = CtrlBoardMenuUpdateEventHandler(menu_controller, + sock_cmd=sock_cmd, + ractl_cmd=ractl_cmd) + ctrlboard_hw.attach(menu_update_event_handler) + menu_controller.set_active_menu(CtrlBoardMenuBuilder.SCSI_ID_MENU) + + while True: + # pylint: disable=broad-except + try: + ctrlboard_hw.process_events() + menu_update_event_handler.update_events() + menu_controller.update() + except KeyboardInterrupt: + ctrlboard_hw.cleanup() + break + except Exception as ex: + print(ex) + + +if __name__ == '__main__': + main() diff --git a/python/ctrlboard/src/menu/__init__.py b/python/ctrlboard/src/menu/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/ctrlboard/src/menu/blank_screensaver.py b/python/ctrlboard/src/menu/blank_screensaver.py new file mode 100644 index 00000000..3f253f18 --- /dev/null +++ b/python/ctrlboard/src/menu/blank_screensaver.py @@ -0,0 +1,26 @@ +"""Module implementing a blank screensaver""" +from menu.screensaver import ScreenSaver + + +class BlankScreenSaver(ScreenSaver): + """Class implementing a blank screen safer that simply blanks the screen after a + configured activation delay""" + def __init__(self, activation_delay, menu_renderer): + super().__init__(activation_delay, menu_renderer) + self._initial_draw_call = None + + def draw_screensaver(self): + if self._initial_draw_call is True: + self.menu_renderer.blank_screen() + else: + self._initial_draw_call = False + + def check_timer(self): + already_enabled = False + if self.enabled is True: + already_enabled = True + + super().check_timer() + + if self.enabled is True and already_enabled is False: # new switch to screensaver + self._initial_draw_call = True diff --git a/python/ctrlboard/src/menu/cycler.py b/python/ctrlboard/src/menu/cycler.py new file mode 100644 index 00000000..f1744227 --- /dev/null +++ b/python/ctrlboard/src/menu/cycler.py @@ -0,0 +1,82 @@ +"""Module that implements a button cycling functionality""" +from abc import abstractmethod +from menu.timer import Timer +from rascsi.file_cmds import FileCmds + + +class Cycler: + """Class implementing button cycling functionality. Message is shown at the center of + the screen where repeated button presses cycle through the available selection + possibilities. Inactivity (cycle_timeout) actives cycle entry last shown on the screen.""" + def __init__(self, menu_controller, sock_cmd, ractl_cmd, + cycle_timeout=3, return_string="Return ->", + return_entry=True, empty_messages=True): + self._cycle_profile_timer_flag = Timer(activation_delay=cycle_timeout) + self._menu_controller = menu_controller + self.sock_cmd = sock_cmd + self.ractl_cmd = ractl_cmd + self.file_cmd = FileCmds(sock_cmd=self.sock_cmd, ractl=self.ractl_cmd) + self.cycle_entries = self.populate_cycle_entries() + self.return_string = return_string + self.return_entry = return_entry + self.empty_messages = empty_messages + if self.return_entry is True: + self.cycle_entries.insert(0, self.return_string) + self.selected_config_file_index = 0 + self.message = str(self.cycle_entries[self.selected_config_file_index]) + self.update() + + @abstractmethod + def populate_cycle_entries(self): + """Returns a list of entries to cycle""" + + @abstractmethod + def perform_selected_entry_action(self, selected_entry): + """Performs an action on the selected cycle menu entry""" + + @abstractmethod + def perform_return_action(self): + """Perform the return action, i.e., when no selection is chosen""" + + def update(self): + """ Returns True if object has completed its task and can be deleted """ + + if self._cycle_profile_timer_flag is None: + return None + + self._cycle_profile_timer_flag.check_timer() + if self.message is not None: + self._menu_controller.show_timed_mini_message(self.message) + self._menu_controller.get_menu_renderer().render() + + if self._cycle_profile_timer_flag.enabled is False: # timer is running + return None + + selected_cycle_entry = str(self.cycle_entries[self.selected_config_file_index]) + + if self.return_entry is True: + if selected_cycle_entry != self.return_string: + if self.empty_messages is True: + self._menu_controller.show_timed_mini_message("") + self._menu_controller.show_timed_message("") + return self.perform_selected_entry_action(selected_cycle_entry) + + self._menu_controller.show_timed_mini_message("") + self._menu_controller.show_timed_message("") + return self.perform_return_action() + + return self.perform_selected_entry_action(selected_cycle_entry) + + def cycle(self): + """Cycles between entries in the cycle menu""" + if self._cycle_profile_timer_flag is None: + return + + self.selected_config_file_index += 1 + + if self.selected_config_file_index > len(self.cycle_entries) - 1: + self.selected_config_file_index = 0 + + self.message = str(self.cycle_entries[self.selected_config_file_index]) + + self._cycle_profile_timer_flag.reset_timer() diff --git a/python/ctrlboard/src/menu/menu.py b/python/ctrlboard/src/menu/menu.py new file mode 100644 index 00000000..17bbdafb --- /dev/null +++ b/python/ctrlboard/src/menu/menu.py @@ -0,0 +1,30 @@ +"""Module for creating a menu""" +from typing import List + + +class Menu: + """Class implement the Menu class""" + def __init__(self, name: str): + self.entries: List = [] + self.item_selection = 0 + self.name = name + self.context_object = None + + def add_entry(self, text, data_object=None): + """Adds an entry to a menu""" + entry = {"text": text, "data_object": data_object} + self.entries.append(entry) + + def get_current_text(self): + """Returns the text content of the currently selected text in the menu.""" + return self.entries[self.item_selection]['text'] + + def get_current_info_object(self): + """Returns the data object to the currently selected menu item""" + return self.entries[self.item_selection]['data_object'] + + def __repr__(self): + print("entries: " + str(self.entries)) + print("item_selection: " + str(self.item_selection)) + print("name: " + self.name) + print("context object: " + str(self.context_object)) diff --git a/python/ctrlboard/src/menu/menu_builder.py b/python/ctrlboard/src/menu/menu_builder.py new file mode 100644 index 00000000..6bf680e4 --- /dev/null +++ b/python/ctrlboard/src/menu/menu_builder.py @@ -0,0 +1,14 @@ +"""Module for creating menus""" +from abc import ABC, abstractmethod +from menu.menu import Menu + + +# pylint: disable=too-few-public-methods +class MenuBuilder(ABC): + """Base class for menu builders""" + def __init__(self): + pass + + @abstractmethod + def build(self, name: str, context_object=None) -> Menu: + """builds a menu and gives it a name and a context object""" diff --git a/python/ctrlboard/src/menu/menu_controller.py b/python/ctrlboard/src/menu/menu_controller.py new file mode 100644 index 00000000..6fe03c15 --- /dev/null +++ b/python/ctrlboard/src/menu/menu_controller.py @@ -0,0 +1,145 @@ +"""Module providing the menu controller.""" +import time +import importlib +from typing import Dict, Optional +from PIL import Image + +from menu.menu import Menu +from menu.menu_builder import MenuBuilder +from menu.menu_renderer_config import MenuRendererConfig +from menu.menu_renderer_luma_oled import MenuRendererLumaOled +from menu.transition import Transition + + +class MenuController: + """Class providing the menu controller. The menu controller is a central class + that controls the menu and its associated rendering to a screen.""" + + def __init__(self, menu_builder: MenuBuilder, menu_renderer=None, menu_renderer_config=None): + self._menus: Dict[str, Menu] = {} + self._active_menu: Optional[Menu] = None + self._menu_renderer = menu_renderer + self._menu_builder: MenuBuilder = menu_builder + self._menu_renderer_config: Optional[MenuRendererConfig] + if menu_renderer_config is None: + self._menu_renderer_config = MenuRendererConfig() + else: + self._menu_renderer_config = menu_renderer_config + + if menu_renderer is None: # default to LumaOled renderer if nothing else is stated + self._menu_renderer = MenuRendererLumaOled(self._menu_renderer_config) + else: + self._menu_renderer = menu_renderer + self._transition: Optional[Transition] = None + if self._menu_renderer_config.transition is None: + self._transition = None + return + try: + module = importlib.import_module("menu.transition") + try: + transition_class = getattr(module, self._menu_renderer_config.transition) + if transition_class is not None: + self._transition = transition_class(self._menu_renderer.disp) + except AttributeError: + pass + except ImportError: + print("transition module does not exist. Falling back to default.") + self._transition = None + + def add(self, name: str, context_object=None): + """Adds a menu to the menu collection internal to the controller by name. + The associated class menu builder builds the menu by name.""" + self._menus[name] = self._menu_builder.build(name) + if context_object is not None: + self._menus[name].context_object = context_object + + def set_active_menu(self, name: str, display_on_device=True): + """Activates a menu from the controller internal menu collection by name.""" + self._active_menu = self._menus[name] + self._menu_renderer.set_menu(self._active_menu) + self._menu_renderer.render(display_on_device) + + def refresh(self, name: str, context_object=None): + """Refreshes a menu by name (by calling the menu builder again to rebuild the menu).""" + item_selection = None + if self._menus.get(name) is not None: + item_selection = self._menus[name].item_selection + self._menus[name] = self._menu_builder.build(name, context_object) + + if context_object is not None: + self._menus[name].context_object = context_object + + if item_selection is not None: + self._menus[name].item_selection = item_selection + + def get_menu(self, name: str): + """Returns the controller internal menu collection""" + return self._menus[name] + + def get_active_menu(self): + """Returns the currently activated menu""" + return self._active_menu + + def get_menu_renderer_config(self): + """Returns the menu renderer configuration""" + return self._menu_renderer_config + + def get_menu_renderer(self): + """Returns the menu renderer for this menu controller""" + return self._menu_renderer + + def segue(self, name, context_object=None, transition_attributes=None): + """Transitions one menu into the other with all associated actions such + as transition animations.""" + self.get_active_menu().context_object = None + self.refresh(name, context_object) + + if self._transition is not None and transition_attributes is not None: + source_image = self._menu_renderer.image.copy() + transition_menu = self.get_menu(name) + self._menu_renderer.set_menu(transition_menu) + target_image = self._menu_renderer.render(display_on_device=False) + transition_attributes["transition_speed"] = self._menu_renderer_config.transition_speed + self._transition.perform(source_image, target_image, transition_attributes) + + self.set_active_menu(name) + + def show_message(self, message: str, sleep=1): + """Displays a blocking message on the screen that stays for sleep seconds""" + self.get_menu_renderer().message = message + self.get_menu_renderer().render() + time.sleep(sleep) + self.get_menu_renderer().message = "" + + def show_timed_message(self, message: str): + """Shows a message on the screen. The timed message is non-blocking for the main loop and + simply redraws the message on screen if necessary.""" + self.get_menu_renderer().message = message + self.get_menu_renderer().render() + + def show_mini_message(self, message: str, sleep=1): + """The mini message is a message on the screen that only coveres the center portion + of the screen, i.e., the remaining content is still visible on the screen while the mini + message is shown in the middle. This version is blocking and stays for sleep seconds.""" + self.get_menu_renderer().mini_message = message + self.get_menu_renderer().render() + time.sleep(sleep) + self.get_menu_renderer().mini_message = "" + + def show_timed_mini_message(self, message: str): + """The mini message is a message on the screen that only coveres the center portion of + the screen, i.e., the remaining content is still visible on the screen while the mini + message is shown in the middle. This version is non-blocking for the main loop and + simply redraws the mini message on screen if necessary.""" + self.get_menu_renderer().mini_message = message + self.get_menu_renderer().render() + + def show_splash_screen(self, filename, sleep=2): + """Shows a splash screen for a given number of seconds.""" + image = Image.open(filename).convert("1") + self.get_menu_renderer().update_display_image(image) + time.sleep(sleep) + + def update(self): + """Updates the menu / draws the screen if necessary.""" + self._menu_renderer.update() diff --git a/python/ctrlboard/src/menu/menu_renderer.py b/python/ctrlboard/src/menu/menu_renderer.py new file mode 100644 index 00000000..8f7c17bf --- /dev/null +++ b/python/ctrlboard/src/menu/menu_renderer.py @@ -0,0 +1,254 @@ +"""Module provides the abstract menu renderer class""" +import time +import math +import itertools + +from abc import ABC, abstractmethod +from pydoc import locate + +from typing import Optional +from PIL import Image +from PIL import ImageDraw +from PIL import ImageFont +from menu.menu import Menu +from menu.menu_renderer_config import MenuRendererConfig +from menu.screensaver import ScreenSaver + + +class MenuRenderer(ABC): + """The abstract menu renderer class provides the base for concrete menu + renderer classes that implement functionality based on conrete hardware or available APIs.""" + def __init__(self, config: MenuRendererConfig): + self.message = "" + self.mini_message = "" + self._menu = None + self._config = config + self.disp = self.display_init() + + self.image = Image.new('1', (self.disp.width, self.disp.height)) + self.draw = ImageDraw.Draw(self.image) + self.font = ImageFont.truetype(config.font_path, size=config.font_size) + # just a sample text to work with the font height + _, self.font_height = self.font.getsize("ABCabc") + self.cursor_position = 0 + self.frame_start_row = 0 + self.render_timestamp = None + # effectively a small state machine that deals with the scrolling + self._perform_scrolling_stage = 0 + self._x_scrolling = 0 + self._current_line_horizontal_overlap = None + self._stage_timestamp: Optional[int] = None + + screensaver = locate(self._config.screensaver) + # noinspection PyCallingNonCallable + self.screensaver: ScreenSaver = screensaver(self._config.screensaver_delay, self) + + @abstractmethod + def display_init(self): + """Method initializes the displays for usage.""" + + @abstractmethod + def display_clear(self): + """Methods clears the screen. Possible hardware clear call if necessary.""" + + @abstractmethod + def blank_screen(self): + """Method blanks the screen. Based on drawing a blank rectangle.""" + + @abstractmethod + def update_display_image(self, image): + """Method displays an image using PIL.""" + + @abstractmethod + def update_display(self): + """Method updates the display.""" + + def set_config(self, config: MenuRendererConfig): + """Configures the menu renderer with a generic menu renderer configuration.""" + self._config = config + + def get_config(self): + """Returns the menu renderer configuration.""" + return self._config + + def set_menu(self, menu: Menu): + """Method sets the menu that the menu renderer should draw.""" + self._menu = menu + + def rows_per_screen(self): + """Calculates the number of rows per screen based on the configured font size.""" + rows = self.disp.height / self.font_height + return math.floor(rows) + + def draw_row(self, row_number: int, text: str, selected: bool): + """Draws a single row of the menu.""" + x_pos = 0 + y_pos = row_number*self.font_height + if selected: + selection_extension = 0 + if row_number < self.rows_per_screen(): + selection_extension = self._config.row_selection_pixel_extension + self.draw.rectangle((x_pos, y_pos, self.disp.width, + y_pos+self._config.font_size+selection_extension), + outline=0, fill=255) + + # in stage 1, we initialize scrolling for the currently selected line + if self._perform_scrolling_stage == 1: + self.setup_horizontal_scrolling(text) + self._perform_scrolling_stage = 2 # don't repeat once the scrolling has started + + # in stage 2, we know the details and can thus perform the scrolling to the left + if self._perform_scrolling_stage == 2: + if self._current_line_horizontal_overlap+self._x_scrolling > 0: + self._x_scrolling -= 1 + if self._current_line_horizontal_overlap+self._x_scrolling == 0: + self._stage_timestamp = int(time.time()) + self._perform_scrolling_stage = 3 + + # in stage 3, we wait for a little when we have scrolled to the end of the text + if self._perform_scrolling_stage == 3: + current_time = int(time.time()) + time_diff = current_time - self._stage_timestamp + if time_diff >= int(self._config.scroll_line_end_delay): + self._stage_timestamp = None + self._perform_scrolling_stage = 4 + + # in stage 4, we scroll back to the right + if self._perform_scrolling_stage == 4: + if self._current_line_horizontal_overlap+self._x_scrolling >= 0: + self._x_scrolling += 1 + + if (self._current_line_horizontal_overlap + + self._x_scrolling) == self._current_line_horizontal_overlap: + self._stage_timestamp = int(time.time()) + self._perform_scrolling_stage = 5 + + # in stage 5, we wait again for a little while before we start again + if self._perform_scrolling_stage == 5: + current_time = int(time.time()) + time_diff = current_time - self._stage_timestamp + if time_diff >= int(self._config.scroll_line_end_delay): + self._stage_timestamp = None + self._perform_scrolling_stage = 2 + + self.draw.text((x_pos+self._x_scrolling, y_pos), text, font=self.font, + spacing=0, stroke_fill=0, fill=0) + else: + self.draw.text((x_pos, y_pos), text, font=self.font, spacing=0, stroke_fill=0, fill=255) + + def draw_fullsceen_message(self, text: str): + """Draws a fullscreen message, i.e., a full-screen message.""" + font_width, font_height = self.font.getsize(text) + centered_width = (self.disp.width - font_width) / 2 + centered_height = (self.disp.height - font_height) / 2 + + self.draw.rectangle((0, 0, self.disp.width, self.disp.height), outline=0, fill=255) + self.draw.text((centered_width, centered_height), text, align="center", font=self.font, + stroke_fill=0, fill=0, textsize=20) + + def draw_mini_message(self, text: str): + """Draws a fullscreen message, i.e., a message covering only the center portion of + the screen. The remaining areas stay visible.""" + font_width, _ = self.font.getsize(text) + centered_width = (self.disp.width - font_width) / 2 + centered_height = (self.disp.height - self.font_height) / 2 + + self.draw.rectangle((0, centered_height-4, self.disp.width, + centered_height+self.font_height+4), outline=0, fill=255) + self.draw.text((centered_width, centered_height), text, align="center", font=self.font, + stroke_fill=0, fill=0, textsize=20) + + def draw_menu(self): + """Method draws the menu set to the class instance.""" + if self._menu.item_selection >= self.frame_start_row + self.rows_per_screen(): + if self._config.scroll_behavior == "page": + self.frame_start_row = self.frame_start_row + (round(self.rows_per_screen()/2)) + 1 + if self.frame_start_row > len(self._menu.entries) - self.rows_per_screen(): + self.frame_start_row = len(self._menu.entries) - self.rows_per_screen() + else: # extend as default behavior + self.frame_start_row = self._menu.item_selection + 1 - self.rows_per_screen() + + if self._menu.item_selection < self.frame_start_row: + if self._config.scroll_behavior == "page": + self.frame_start_row = self.frame_start_row - (round(self.rows_per_screen()/2)) - 1 + if self.frame_start_row < 0: + self.frame_start_row = 0 + else: # extend as default behavior + self.frame_start_row = self._menu.item_selection + + self.draw_menu_frame(self.frame_start_row, self.frame_start_row+self.rows_per_screen()) + + def draw_menu_frame(self, frame_start_row: int, frame_end_row: int): + """Draws row frame_start_row to frame_end_row of the class instance menu, i.e., it + draws a given frame of the complete menu that fits the screen.""" + self.draw.rectangle((0, 0, self.disp.width, self.disp.height), outline=0, fill=0) + row_on_screen = 0 + row_in_menuitems = frame_start_row + for menu_entry in itertools.islice(self._menu.entries, frame_start_row, frame_end_row): + if row_in_menuitems == self._menu.item_selection: + self.draw_row(row_on_screen, menu_entry["text"], True) + else: + self.draw_row(row_on_screen, menu_entry["text"], False) + row_in_menuitems += 1 + row_on_screen += 1 + if row_on_screen >= self.rows_per_screen(): + break + + def render(self, display_on_device=True): + """Method renders the menu.""" + if display_on_device is True: + self.screensaver.reset_timer() + self._perform_scrolling_stage = 0 + self._current_line_horizontal_overlap = None + self._x_scrolling = 0 + + if self._menu is None: + self.draw_fullsceen_message("No menu set!") + self.disp.image(self.image) + + if display_on_device is True: + self.disp.show() + return None + + self.display_clear() + + if self.message != "": + self.draw_fullsceen_message(self.message) + elif self.mini_message != "": + self.draw_mini_message(self.mini_message) + else: + self.draw_menu() + + if display_on_device is True: + self.update_display_image(self.image) + self.update_display() + + self.render_timestamp = int(time.time()) + + return self.image + + def setup_horizontal_scrolling(self, text): + """Configure horizontal scrolling based on the configured screen dimensions.""" + font_width, _ = self.font.getsize(text) + self._current_line_horizontal_overlap = font_width - self.disp.width + + def update(self): + """Method updates the menu drawing within the main loop in a non-blocking manner. + Also updates the current entry scrolling if activated.""" + if self._config.scroll_line is False: + return + + self.screensaver.check_timer() + + if self.screensaver.enabled is True: + self.screensaver.draw() + return + + current_time = int(time.time()) + time_diff = current_time - self.render_timestamp + + if time_diff >= self._config.scroll_delay: + if self._perform_scrolling_stage == 0: + self._perform_scrolling_stage = 1 + self.draw_menu() + self.update_display_image(self.image) diff --git a/python/ctrlboard/src/menu/menu_renderer_adafruit_ssd1306.py b/python/ctrlboard/src/menu/menu_renderer_adafruit_ssd1306.py new file mode 100644 index 00000000..d5d4c2c8 --- /dev/null +++ b/python/ctrlboard/src/menu/menu_renderer_adafruit_ssd1306.py @@ -0,0 +1,34 @@ +"""Module providing the Adafruit SSD1306 menu renderer class""" +# pylint: disable=import-error +from board import SCL, SDA +import busio +import adafruit_ssd1306 +from menu.menu_renderer import MenuRenderer + + +class MenuRendererAdafruitSSD1306(MenuRenderer): + """Class implementing a menu renderer using the Adafruit SSD1306 library""" + + def display_init(self): + i2c = busio.I2C(SCL, SDA) + self.disp = adafruit_ssd1306.SSD1306_I2C(self._config.width, self._config.height, i2c, + addr=self._config.i2c_address) + self.disp.rotation = self._config.get_mapped_rotation() + self.disp.fill(0) + self.disp.show() + + return self.disp + + def update_display_image(self, image): + self.disp.image(self.image) + self.disp.show() + + def update_display(self): + self.disp.show() + + def display_clear(self): + self.disp.fill(0) + + def blank_screen(self): + self.disp.fill(0) + self.disp.show() diff --git a/python/ctrlboard/src/menu/menu_renderer_config.py b/python/ctrlboard/src/menu/menu_renderer_config.py new file mode 100644 index 00000000..0d12e72c --- /dev/null +++ b/python/ctrlboard/src/menu/menu_renderer_config.py @@ -0,0 +1,39 @@ +"""Module for configuring menu renderer instances""" + + +# pylint: disable=too-many-instance-attributes, too-few-public-methods +class MenuRendererConfig: + """Class for configuring menu renderer instances. Provides configuration options + such as width, height, i2c address, font, transitions, etc.""" + _rotation_mapper = { + 0: 0, + 90: 1, + 180: 2, + 270: 3 + } + + def __init__(self): + self.width = 128 + self.height = 64 + self.i2c_address = 0x3c + self.i2c_port = 1 + self.display_type = "ssd1306" # luma-oled supported devices, "sh1106", "ssd1306", ... + self.font_path = 'resources/DejaVuSansMono-Bold.ttf' + self.font_size = 12 + self.row_selection_pixel_extension = 2 + self.scroll_behavior = "page" # "extend" or "page" + self.transition = "PushTransition" # "PushTransition" or "None + self.transition_attributes_left = {"direction": "push_left"} + self.transition_attributes_right = {"direction": "push_right"} + self.transition_speed = 10 + self.scroll_line = True + self.scroll_delay = 3 + self.scroll_line_end_delay = 2 + self.screensaver = "menu.blank_screensaver.BlankScreenSaver" + self.screensaver_delay = 25 + self.rotation = 0 # 0, 180 + + def get_mapped_rotation(self): + """Converts human-readable rotation value to the one expected + by the luma and adafruit libraries""" + return self._rotation_mapper[self.rotation] diff --git a/python/ctrlboard/src/menu/menu_renderer_luma_oled.py b/python/ctrlboard/src/menu/menu_renderer_luma_oled.py new file mode 100644 index 00000000..e6c30b53 --- /dev/null +++ b/python/ctrlboard/src/menu/menu_renderer_luma_oled.py @@ -0,0 +1,34 @@ +"""Module providing the luma oled menu renderer class""" +from luma.core.interface.serial import i2c +from menu.menu_renderer import MenuRenderer + + +class MenuRendererLumaOled(MenuRenderer): + """Class implementing the luma oled menu renderer""" + def display_init(self): + serial = i2c(port=self._config.i2c_port, address=self._config.i2c_address) + import luma.oled.device + device = getattr(luma.oled.device, self._config.display_type) + + self.disp = device(serial_interface=serial, width=self._config.width, + height=self._config.height, + rotate=self._config.get_mapped_rotation()) + + self.disp.clear() + self.disp.show() + + return self.disp + + def update_display_image(self, image): + self.disp.display(image) + + def update_display(self): + self.disp.display(self.image) + + def display_clear(self): + pass + + def blank_screen(self): + self.disp.clear() + self.draw.rectangle((0, 0, self.disp.width, self.disp.height), outline=0, fill=0) + self.disp.show() diff --git a/python/ctrlboard/src/menu/screensaver.py b/python/ctrlboard/src/menu/screensaver.py new file mode 100644 index 00000000..f291f753 --- /dev/null +++ b/python/ctrlboard/src/menu/screensaver.py @@ -0,0 +1,33 @@ +"""Module providing the menu screensaver class""" +from abc import abstractmethod +from menu.timer import Timer + + +class ScreenSaver: + """Class implementing the menu screensaver""" + + def __init__(self, activation_delay, menu_renderer): + self.enabled = False + self.menu_renderer = menu_renderer + self.screensaver_activation_delay = activation_delay + self.timer_flag = Timer(self.screensaver_activation_delay) + + def draw(self): + """Draws the screen saver in a non-blocking way if enabled.""" + if self.enabled is True: + self.draw_screensaver() + + @abstractmethod + def draw_screensaver(self): + """Draws the screen saver. Must be implemented in subclasses.""" + + def check_timer(self): + """Checks if the screen saver should be enabled given the configured + activation delay.""" + self.timer_flag.check_timer() + self.enabled = self.timer_flag.enabled + + def reset_timer(self): + """Resets the screen saver timer if an activitiy happend.""" + self.timer_flag.reset_timer() + self.enabled = self.timer_flag.enabled diff --git a/python/ctrlboard/src/menu/timer.py b/python/ctrlboard/src/menu/timer.py new file mode 100644 index 00000000..3d41cc6b --- /dev/null +++ b/python/ctrlboard/src/menu/timer.py @@ -0,0 +1,24 @@ +"""Module providing a timer class""" +import time + + +class Timer: + """Class implementing a timer class. Takes an activation delay and + sets a flag if the activation delay exprires.""" + def __init__(self, activation_delay): + self.start_timestamp = int(time.time()) + self.activation_delay = activation_delay + self.enabled = False + + def check_timer(self): + """Checks the timer whether it has reached the activation delay.""" + current_timestamp = int(time.time()) + timestamp_diff = current_timestamp - self.start_timestamp + + if timestamp_diff >= self.activation_delay: + self.enabled = True + + def reset_timer(self): + """Resets the timer and starts from the beginning.""" + self.start_timestamp = int(time.time()) + self.enabled = False diff --git a/python/ctrlboard/src/menu/transition.py b/python/ctrlboard/src/menu/transition.py new file mode 100644 index 00000000..a61df0a5 --- /dev/null +++ b/python/ctrlboard/src/menu/transition.py @@ -0,0 +1,65 @@ +"""Module providing implementations for menu transitions.""" +from abc import abstractmethod +from PIL import Image + + +# pylint: disable=too-few-public-methods +class Transition: + """Class provides the interface for menu transitions. Must be subclassed.""" + + def __init__(self, disp): + self.disp = disp + + @abstractmethod + def perform(self, start_image: Image, end_image: Image, transition_attributes=None) -> None: + """Signature for an abstract transition. Must be implemented in subclasses.""" + + +class PushTransition(Transition): + """Class implementing a push left/right transition.""" + PUSH_LEFT_TRANSITION = "push_left" + PUSH_RIGHT_TRANSITION = "push_right" + + def __init__(self, disp): + super().__init__(disp) + self.transition_attributes = None + + def perform(self, start_image: Image, end_image: Image, transition_attributes=None): + """Performs a push transition to the left or right depending on the + transition attribute 'direction'.""" + direction = {} + self.transition_attributes = transition_attributes + if transition_attributes is not None and transition_attributes != {}: + direction = transition_attributes["direction"] + + transition_image = Image.new('1', (self.disp.width, self.disp.height)) + + if direction == PushTransition.PUSH_LEFT_TRANSITION: + self.perform_push_left(end_image, start_image, transition_image) + elif direction == PushTransition.PUSH_RIGHT_TRANSITION: + self.perform_push_right(end_image, start_image, transition_image) + else: + self.disp.image(end_image) + self.disp.show() + + def perform_push_left(self, end_image, start_image, transition_image): + """Implements a push left transition. Is called by perform depending on the transition + attribute 'direction'.""" + for x_pos in range(0, 128, self.transition_attributes["transition_speed"]): + left_region = start_image.crop((x_pos, 0, 128, 64)) + right_region = end_image.crop((0, 0, x_pos, 64)) + transition_image.paste(left_region, (0, 0, 128 - x_pos, 64)) + transition_image.paste(right_region, (128 - x_pos, 0, 128, 64)) + self.disp.display(transition_image) + self.disp.display(end_image) + + def perform_push_right(self, end_image, start_image, transition_image): + """Implements a push right transition. Is called by perform depending on the transition + attribute 'direction'.""" + for x_pos in range(0, 128, self.transition_attributes["transition_speed"]): + left_region = start_image.crop((0, 0, 128-x_pos, 64)) + right_region = end_image.crop((128-x_pos, 0, 128, 64)) + transition_image.paste(left_region, (x_pos, 0, 128, 64)) + transition_image.paste(right_region, (0, 0, x_pos, 64)) + self.disp.display(transition_image) + self.disp.display(end_image) diff --git a/python/ctrlboard/src/observable.py b/python/ctrlboard/src/observable.py new file mode 100644 index 00000000..946dd1c5 --- /dev/null +++ b/python/ctrlboard/src/observable.py @@ -0,0 +1,21 @@ +"""Module for Observable part of the Observer pattern functionality""" +from typing import List +from observer import Observer + + +class Observable: + """Class implementing the Observable pattern""" + _observers: List[Observer] = [] + + def attach(self, observer: Observer): + """Attaches an observer to an obserable object""" + self._observers.append(observer) + + def detach(self, observer: Observer): + """detaches an observer from an observable object""" + self._observers.remove(observer) + + def notify(self, updated_object): + """Notifies all observers with a given object parameter""" + for observer in self._observers: + observer.update(updated_object) diff --git a/python/ctrlboard/src/observer.py b/python/ctrlboard/src/observer.py new file mode 100644 index 00000000..0fda4830 --- /dev/null +++ b/python/ctrlboard/src/observer.py @@ -0,0 +1,10 @@ +"""Module implementing the Observer part of the Observer pattern""" +from abc import ABC, abstractmethod + + +# pylint: disable=too-few-public-methods +class Observer(ABC): + """Class implementing an abserver""" + @abstractmethod + def update(self, updated_object) -> None: + """Abstract method for updating an observer. Needs to be implemented by subclasses.""" diff --git a/python/ctrlboard/src/rascsi_menu_controller.py b/python/ctrlboard/src/rascsi_menu_controller.py new file mode 100644 index 00000000..9b0a538d --- /dev/null +++ b/python/ctrlboard/src/rascsi_menu_controller.py @@ -0,0 +1,32 @@ +"""Module implementing the RaSCSI Control Board UI specific menu controller""" +from ctrlboard_menu_builder import CtrlBoardMenuBuilder +from menu.menu_builder import MenuBuilder +from menu.menu_controller import MenuController +from menu.menu_renderer import MenuRenderer +from menu.timer import Timer + + +class RascsiMenuController(MenuController): + """Class implementing a RaSCSI Control Board UI specific menu controller""" + + def __init__(self, refresh_interval, menu_builder: MenuBuilder, + menu_renderer=None, menu_renderer_config=None): + super().__init__(menu_builder, menu_renderer, menu_renderer_config) + self._refresh_interval = refresh_interval + self._menu_renderer: MenuRenderer = menu_renderer + self._scsi_list_refresh_timer_flag = Timer(self._refresh_interval) + + def segue(self, name, context_object=None, transition_attributes=None): + super().segue(name, context_object, transition_attributes) + self._scsi_list_refresh_timer_flag.reset_timer() + + def update(self): + super().update() + + if self.get_active_menu().name == CtrlBoardMenuBuilder.SCSI_ID_MENU: + self._scsi_list_refresh_timer_flag.check_timer() + if self._scsi_list_refresh_timer_flag.enabled is True: + self._scsi_list_refresh_timer_flag.reset_timer() + self.refresh(name=CtrlBoardMenuBuilder.SCSI_ID_MENU) + if self._menu_renderer.screensaver.enabled is False: + self.set_active_menu(CtrlBoardMenuBuilder.SCSI_ID_MENU, display_on_device=False) diff --git a/python/ctrlboard/start.sh b/python/ctrlboard/start.sh new file mode 100755 index 00000000..bf268346 --- /dev/null +++ b/python/ctrlboard/start.sh @@ -0,0 +1,128 @@ +#!/usr/bin/env bash +set -e +# set -x # Uncomment to Debug + +PI_MODEL=$(/usr/bin/tr -d '\0' < /proc/device-tree/model) + +cd "$(dirname "$0")" +# verify packages installed +ERROR=0 +if ! command -v dpkg -l i2c-tools &> /dev/null ; then + echo "i2c-tools could not be found" + echo "Run 'sudo apt install i2c-tools' to fix." + ERROR=1 +fi +if ! command -v python3 &> /dev/null ; then + echo "python3 could not be found" + echo "Run 'sudo apt install python3' to fix." + ERROR=1 +fi +if ! python3 -m venv --help &> /dev/null ; then + echo "venv could not be found" + echo "Run 'sudo apt install python3-venv' to fix." + ERROR=1 +fi +# Dep to build Pillow +if ! dpkg -l python3-dev &> /dev/null; then + echo "python3-dev could not be found" + echo "Run 'sudo apt install python3-dev' to fix." + ERROR=1 +fi +if ! dpkg -l libjpeg-dev &> /dev/null; then + echo "libjpeg-dev could not be found" + echo "Run 'sudo apt install libjpeg-dev' to fix." + ERROR=1 +fi +if ! dpkg -l libpng-dev &> /dev/null; then + echo "libpng-dev could not be found" + echo "Run 'sudo apt install libpng-dev' to fix." + ERROR=1 +fi +if ! dpkg -l libopenjp2-7-dev &> /dev/null; then + echo "libopenjp2-7-dev could not be found" + echo "Run 'sudo apt install libopenjp2-7-dev' to fix." + ERROR=1 +fi +if [ $ERROR = 1 ] ; then + echo + echo "Fix errors and re-run ./start.sh" + exit 1 +fi + +if pgrep -f "python3 src/main.py" &> /dev/null; then + echo "Detected active rascsi control board service" + echo "Terminating before launching a new one." + sudo pkill -f "python3 src/main.py" +fi + +if ! i2cdetect -y 1 &> /dev/null ; then + echo "i2cdetect -y 1 did not find a screen." + exit 2 +fi + +# Compiler flags needed for gcc v10 and up +if [[ `gcc --version | awk '/gcc/' | awk -F ' ' '{print $3}' | awk -F '.' '{print $1}'` -ge 10 ]]; then + COMPILER_FLAGS="-fcommon" +fi + +# Test for two known broken venv states +if test -e venv; then + GOOD_VENV=true + ! test -e venv/bin/activate && GOOD_VENV=false + pip3 list 1> /dev/null + test $? -eq 1 && GOOD_VENV=false + if ! "$GOOD_VENV"; then + echo "Deleting bad python venv" + sudo rm -rf venv + fi +fi + +# Create the venv if it doesn't exist +if ! test -e venv; then + echo "Creating python venv for RaSCSI control board service" + python3 -m venv venv --system-site-packages + echo "Activating venv" + source venv/bin/activate + echo "Installing requirements.txt" + pip3 install wheel + CFLAGS="$COMPILER_FLAGS" pip3 install -r requirements.txt + + set +e + git rev-parse --is-inside-work-tree &> /dev/null + if [[ $? -eq 0 ]]; then + git rev-parse HEAD > current + fi +fi + +source venv/bin/activate + +# Detect if someone updates the git repo - we need to re-run pip3 install. +set +e +git rev-parse --is-inside-work-tree &> /dev/null +if [[ $? -eq 0 ]]; then + set -e + if ! test -e current; then + git rev-parse > current + elif [ "$(cat current)" != "$(git rev-parse HEAD)" ]; then + echo "New version detected, updating libraries from requirements.txt" + CFLAGS="$COMPILER_FLAGS" pip3 install -r requirements.txt + git rev-parse HEAD > current + fi +else + echo "Warning: Not running from a valid git repository. Will not be able to update the code." +fi +set -e + +export PYTHONPATH=${PWD}/src:${PWD}/../common/src + +echo "Starting RaSCSI control board service..." + +if [[ ${PI_MODEL} =~ "Raspberry Pi 4" ]] || [[ ${PI_MODEL} =~ "Raspberry Pi 3" ]] || + [[ ${PI_MODEL} =~ "Raspberry Pi Zero 2" ]]; then + echo "Detected: Raspberry Pi 4, Pi 3 or Pi Zero 2" + python3 src/main.py "$@" --transitions 1 +else + echo "Detected: Raspberry Pi Zero, Zero W, Zero WH, ..." + echo "Transition animations will be disabled." + python3 src/main.py "$@" --transitions 0 +fi