mirror of
https://github.com/irmen/prog8.git
synced 2025-01-11 13:29:45 +00:00
timer irq changes
This commit is contained in:
parent
8bfe21dd65
commit
12ae95bbbc
@ -1,7 +1,7 @@
|
||||
; source code for a tinyvm program for the timer
|
||||
%block b1_timer
|
||||
%vardefs
|
||||
var byte teller_timer 0
|
||||
var byte teller_timer 32
|
||||
const word screenloc_timer 1028
|
||||
const byte one_timer 1
|
||||
%end_vardefs
|
||||
|
@ -5,7 +5,7 @@ var word teller 0
|
||||
var word numbertoprint 0
|
||||
const byte one 1
|
||||
const word thousand 1000
|
||||
const byte space_chr 10
|
||||
const byte space_chr 32
|
||||
const word screenstart 1024
|
||||
%end_vardefs
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
import enum
|
||||
import array
|
||||
import operator
|
||||
from typing import List, Dict, Optional, Union, Callable
|
||||
from typing import List, Dict, Optional, Union, Callable, Any
|
||||
|
||||
|
||||
class Opcode(enum.IntEnum):
|
||||
@ -109,7 +109,9 @@ class Value:
|
||||
def __floordiv__(self, other: 'Value') -> 'Value':
|
||||
return self.number_arithmetic(self, operator.floordiv, other)
|
||||
|
||||
def __eq__(self, other: 'Value') -> bool:
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if not isinstance(other, Value):
|
||||
return False
|
||||
return self.number_comparison(self, operator.eq, other)
|
||||
|
||||
def __lt__(self, other: 'Value') -> bool:
|
||||
@ -125,7 +127,6 @@ class Value:
|
||||
return self.number_comparison(self, operator.ge, other)
|
||||
|
||||
|
||||
|
||||
class Variable:
|
||||
__slots__ = ["name", "value", "dtype", "length", "height", "const"]
|
||||
|
||||
|
109
tinyvm/vm.py
109
tinyvm/vm.py
@ -51,7 +51,7 @@
|
||||
# call function (arguments are on stack)
|
||||
# enter / exit (function call frame)
|
||||
#
|
||||
# TIMER INTERRUPT: triggered around each 1/30th of a second.
|
||||
# TIMER INTERRUPT: triggered around each 1/60th of a second.
|
||||
# executes on a DIFFERENT stack and with a different PROGRAM LIST,
|
||||
# but with access to ALL THE SAME DYNAMIC VARIABLES.
|
||||
# This suspends the main program until the timer program RETURNs!
|
||||
@ -65,7 +65,7 @@ import threading
|
||||
import pprint
|
||||
import tkinter
|
||||
import tkinter.font
|
||||
from typing import Dict, List, Tuple, Union
|
||||
from typing import Dict, List, Tuple, Union, no_type_check
|
||||
from il65.emit import mflpt5_to_float, to_mflpt5
|
||||
from .program import Instruction, Variable, Block, Program, Opcode, Value, DataType
|
||||
|
||||
@ -224,8 +224,6 @@ class VM:
|
||||
str_alt_encoding = "iso-8859-15"
|
||||
readonly_mem_ranges = [] # type: List[Tuple[int, int]]
|
||||
timer_irq_resolution = 1/30
|
||||
timer_irq_interlock = threading.Lock()
|
||||
timer_irq_event = threading.Event()
|
||||
|
||||
def __init__(self, program: Program, timerprogram: Program=None) -> None:
|
||||
opcode_names = [oc.name for oc in Opcode]
|
||||
@ -263,7 +261,6 @@ class VM:
|
||||
if i.opcode in (Opcode.CALL, Opcode.JUMP_IF_FALSE, Opcode.JUMP_IF_TRUE)), "main: alt_nexts must be set"
|
||||
assert all(i.alt_next for i in self.timer_program
|
||||
if i.opcode in (Opcode.CALL, Opcode.JUMP_IF_FALSE, Opcode.JUMP_IF_TRUE)), "timer: alt_nexts must be set"
|
||||
threading.Thread(target=self.timer_irq, name="timer_irq", daemon=True).start()
|
||||
print("[TinyVM starting up.]")
|
||||
|
||||
def enable_charscreen(self, screen_address: int, width: int, height: int) -> None:
|
||||
@ -339,15 +336,24 @@ class VM:
|
||||
threading.Thread(target=ScreenViewer.create,
|
||||
args=(self.memory, self.system, self.charscreen_address, self.charscreen_width, self.charscreen_height),
|
||||
name="screenviewer", daemon=True).start()
|
||||
time.sleep(0.05)
|
||||
|
||||
self.pc = self.program[0] # first instruction of the main program
|
||||
self.stack.push(CallFrameMarker(None)) # enter the call frame so the timer program can end with a RETURN
|
||||
try:
|
||||
counter = 0
|
||||
previous_timer_irq = time.perf_counter()
|
||||
while self.pc is not None:
|
||||
with self.timer_irq_interlock:
|
||||
next_pc = self.dispatch_table[self.pc.opcode](self, self.pc)
|
||||
if next_pc:
|
||||
self.pc = self.pc.next
|
||||
next_pc = self.dispatch_table[self.pc.opcode](self, self.pc)
|
||||
if next_pc:
|
||||
self.pc = self.pc.next
|
||||
counter += 1
|
||||
if self.charscreen_address and counter % 1000 == 0:
|
||||
time.sleep(0.001) # allow the tkinter window to update
|
||||
time_since_irq = time.perf_counter() - previous_timer_irq
|
||||
if time_since_irq > 1/60:
|
||||
self.timer_irq()
|
||||
previous_timer_irq = time.perf_counter()
|
||||
except TerminateExecution as x:
|
||||
why = str(x)
|
||||
print("[TinyVM execution terminated{:s}]\n".format(": "+why if why else "."))
|
||||
@ -360,34 +366,23 @@ class VM:
|
||||
print("[TinyVM execution ended.]")
|
||||
|
||||
def timer_irq(self) -> None:
|
||||
# This is the timer 'irq' handler. It runs the timer program at a certain interval.
|
||||
# NOTE: executing the timer program will LOCK the main program and vice versa!
|
||||
# (because the VM is a strictly single threaded machine)
|
||||
resolution = 1/30
|
||||
wait_time = resolution
|
||||
while True:
|
||||
self.timer_irq_event.wait(wait_time)
|
||||
self.timer_irq_event.clear()
|
||||
start = time.perf_counter()
|
||||
if self.timer_program:
|
||||
with self.timer_irq_interlock:
|
||||
previous_pc = self.pc
|
||||
previous_program = self.program
|
||||
previous_stack = self.stack
|
||||
self.stack = self.timer_stack
|
||||
self.program = self.timer_program
|
||||
self.pc = self.program[0]
|
||||
self.stack.push(CallFrameMarker(None)) # enter the call frame so the timer program can end with a RETURN
|
||||
while self.pc is not None:
|
||||
next_pc = self.dispatch_table[self.pc.opcode](self, self.pc)
|
||||
if next_pc:
|
||||
self.pc = self.pc.next
|
||||
self.pc = previous_pc
|
||||
self.program = previous_program
|
||||
self.stack = previous_stack
|
||||
current = time.perf_counter()
|
||||
duration, previously = current - start, current
|
||||
wait_time = max(0, resolution - duration)
|
||||
# This is the timer 'irq' handler. It is called to run the timer program at a certain interval.
|
||||
# During the execution the main program is halted
|
||||
if self.timer_program:
|
||||
previous_pc = self.pc
|
||||
previous_program = self.program
|
||||
previous_stack = self.stack
|
||||
self.stack = self.timer_stack
|
||||
self.program = self.timer_program
|
||||
self.pc = self.program[0]
|
||||
self.stack.push(CallFrameMarker(None)) # enter the call frame so the timer program can end with a RETURN
|
||||
while self.pc is not None:
|
||||
next_pc = self.dispatch_table[self.pc.opcode](self, self.pc)
|
||||
if next_pc:
|
||||
self.pc = self.pc.next
|
||||
self.pc = previous_pc
|
||||
self.program = previous_program
|
||||
self.stack = previous_stack
|
||||
|
||||
def debug_stack(self, size: int=5) -> None:
|
||||
stack = self.stack.debug_peek(size)
|
||||
@ -416,7 +411,7 @@ class VM:
|
||||
raise TerminateExecution()
|
||||
|
||||
def opcode_PUSH(self, instruction: Instruction) -> bool:
|
||||
value = self.variables[instruction.args[0]].value
|
||||
value = self.variables[instruction.args[0]].value # type: ignore
|
||||
self.stack.push(value)
|
||||
return True
|
||||
|
||||
@ -435,12 +430,14 @@ class VM:
|
||||
self.stack.push2(value2, value1)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_PUSH2(self, instruction: Instruction) -> bool:
|
||||
value1 = self.variables[instruction.args[0]].value
|
||||
value2 = self.variables[instruction.args[1]].value
|
||||
self.stack.push2(value1, value2)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_PUSH3(self, instruction: Instruction) -> bool:
|
||||
value1 = self.variables[instruction.args[0]].value
|
||||
value2 = self.variables[instruction.args[1]].value
|
||||
@ -448,12 +445,14 @@ class VM:
|
||||
self.stack.push3(value1, value2, value3)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_POP(self, instruction: Instruction) -> bool:
|
||||
value = self.stack.pop()
|
||||
variable = self.variables[instruction.args[0]]
|
||||
self.assign_variable(variable, value)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_POP2(self, instruction: Instruction) -> bool:
|
||||
value1, value2 = self.stack.pop2()
|
||||
variable = self.variables[instruction.args[0]]
|
||||
@ -462,6 +461,7 @@ class VM:
|
||||
self.assign_variable(variable, value2)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_POP3(self, instruction: Instruction) -> bool:
|
||||
value1, value2, value3 = self.stack.pop3()
|
||||
variable = self.variables[instruction.args[0]]
|
||||
@ -472,24 +472,28 @@ class VM:
|
||||
self.assign_variable(variable, value3)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_ADD(self, instruction: Instruction) -> bool:
|
||||
second, first = self.stack.pop2()
|
||||
self.stack.push(first + second) # type: ignore
|
||||
self.stack.push(first + second)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_SUB(self, instruction: Instruction) -> bool:
|
||||
second, first = self.stack.pop2()
|
||||
self.stack.push(first - second) # type: ignore
|
||||
self.stack.push(first - second)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_MUL(self, instruction: Instruction) -> bool:
|
||||
second, first = self.stack.pop2()
|
||||
self.stack.push(first * second) # type: ignore
|
||||
self.stack.push(first * second)
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_DIV(self, instruction: Instruction) -> bool:
|
||||
second, first = self.stack.pop2()
|
||||
self.stack.push(first / second) # type: ignore
|
||||
self.stack.push(first / second)
|
||||
return True
|
||||
|
||||
def opcode_AND(self, instruction: Instruction) -> bool:
|
||||
@ -522,21 +526,25 @@ class VM:
|
||||
self.stack.push(Value(DataType.BOOL, first == second))
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_CMP_LT(self, instruction: Instruction) -> bool:
|
||||
second, first = self.stack.pop2()
|
||||
self.stack.push(Value(DataType.BOOL, first < second))
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_CMP_GT(self, instruction: Instruction) -> bool:
|
||||
second, first = self.stack.pop2()
|
||||
self.stack.push(Value(DataType.BOOL, first > second))
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_CMP_LTE(self, instruction: Instruction) -> bool:
|
||||
second, first = self.stack.pop2()
|
||||
self.stack.push(Value(DataType.BOOL, first <= second))
|
||||
return True
|
||||
|
||||
@no_type_check
|
||||
def opcode_CMP_GTE(self, instruction: Instruction) -> bool:
|
||||
second, first = self.stack.pop2()
|
||||
self.stack.push(Value(DataType.BOOL, first >= second))
|
||||
@ -632,7 +640,7 @@ class System:
|
||||
value = self.vm.stack.pop()
|
||||
assert isinstance(value, Value)
|
||||
if value.dtype == DataType.ARRAY_BYTE:
|
||||
print(self.decodestr(value.value), end="@") # type: ignore
|
||||
print(self.decodestr(value.value), end="") # type: ignore
|
||||
return True
|
||||
else:
|
||||
raise TypeError("printstr expects bytearray", value)
|
||||
@ -656,6 +664,7 @@ class System:
|
||||
|
||||
def syscall_decimalstr_signed(self) -> bool:
|
||||
value = self.vm.stack.pop()
|
||||
assert isinstance(value, Value)
|
||||
if value.dtype in (DataType.SBYTE, DataType.SWORD):
|
||||
self.vm.stack.push(Value(DataType.ARRAY_BYTE, self.encodestr(str(value.value))))
|
||||
return True
|
||||
@ -664,6 +673,7 @@ class System:
|
||||
|
||||
def syscall_decimalstr_unsigned(self) -> bool:
|
||||
value = self.vm.stack.pop()
|
||||
assert isinstance(value, Value)
|
||||
if value.dtype in (DataType.BYTE, DataType.WORD):
|
||||
self.vm.stack.push(Value(DataType.ARRAY_BYTE, self.encodestr(str(value.value))))
|
||||
return True
|
||||
@ -737,7 +747,8 @@ class System:
|
||||
class ScreenViewer(tkinter.Tk):
|
||||
def __init__(self, memory: Memory, system: System, screen_addr: int, screen_width: int, screen_height: int) -> None:
|
||||
super().__init__()
|
||||
self.fontsize = 14
|
||||
self.title("IL65 tinyvm")
|
||||
self.fontsize = 16
|
||||
self.memory = memory
|
||||
self.system = system
|
||||
self.address = screen_addr
|
||||
@ -751,11 +762,13 @@ class ScreenViewer(tkinter.Tk):
|
||||
|
||||
def update_screen(self):
|
||||
self.canvas.delete(tkinter.ALL)
|
||||
lines = []
|
||||
for y in range(self.height):
|
||||
line = self.memory.get_bytes(self.address+y*self.width, self.width)
|
||||
text = self.system.decodestr(line)
|
||||
self.canvas.create_text(4, self.fontsize*y, text=text, fill="white", font=self.monospace, anchor=tkinter.NW)
|
||||
self.after(10, self.update_screen)
|
||||
line = self.system.decodestr(self.memory.get_bytes(self.address+y*self.width, self.width))
|
||||
lines.append("".join(c if c.isprintable() else " " for c in line))
|
||||
for y, line in enumerate(lines):
|
||||
self.canvas.create_text(4, self.fontsize*y, text=line, fill="white", font=self.monospace, anchor=tkinter.NW)
|
||||
self.after(30, self.update_screen)
|
||||
|
||||
@classmethod
|
||||
def create(cls, memory: Memory, system: System, screen_addr: int, screen_width: int, screen_height: int) -> None:
|
||||
|
Loading…
x
Reference in New Issue
Block a user