mirror of
https://github.com/irmen/prog8.git
synced 2024-11-25 19:31:36 +00:00
timer things
This commit is contained in:
parent
27e6749533
commit
ab84b6012b
22
testvm-timer.txt
Normal file
22
testvm-timer.txt
Normal file
@ -0,0 +1,22 @@
|
||||
; source code for a tinyvm program for the timer
|
||||
%block b1_timer
|
||||
%vardefs
|
||||
var byte teller_timer 0
|
||||
const word screenloc_timer 1028
|
||||
const byte one_timer 1
|
||||
%end_vardefs
|
||||
|
||||
%instructions
|
||||
push teller_timer
|
||||
push one_timer
|
||||
add
|
||||
dup
|
||||
pop teller_timer
|
||||
push screenloc_timer
|
||||
swap
|
||||
syscall memwrite_word
|
||||
return 0
|
||||
%end_instructions
|
||||
%subblocks
|
||||
%end_subblocks
|
||||
%end_block ;b1_timer
|
@ -5,7 +5,8 @@ var byte teller 0
|
||||
var byte numbertoprint 0
|
||||
const byte one 1
|
||||
const byte thousand 1000
|
||||
const byte space_chr 32
|
||||
const byte space_chr 10
|
||||
const word screenstart 1024
|
||||
%end_vardefs
|
||||
|
||||
%instructions
|
||||
@ -15,8 +16,13 @@ back:
|
||||
add
|
||||
dup
|
||||
dup
|
||||
dup
|
||||
push screenstart
|
||||
swap
|
||||
syscall memwrite_word
|
||||
pop teller
|
||||
call 1 printnumber
|
||||
syscall delay
|
||||
push thousand
|
||||
cmp_lt
|
||||
jump_if_true back
|
||||
|
@ -1,17 +1,26 @@
|
||||
import sys
|
||||
from .parse import Parser
|
||||
from .program import Program, Opcode, Block, Instruction
|
||||
# from .program import Program, Opcode, Block, Instruction
|
||||
from .vm import VM
|
||||
|
||||
mainprogram = None
|
||||
timerprogram = None
|
||||
|
||||
source = open(sys.argv[1]).read()
|
||||
if len(sys.argv) >= 2:
|
||||
source = open(sys.argv[1]).read()
|
||||
parser = Parser(source)
|
||||
mainprogram = parser.parse()
|
||||
|
||||
if len(sys.argv) == 3:
|
||||
source = open(sys.argv[2]).read()
|
||||
parser = Parser(source)
|
||||
timerprogram = parser.parse()
|
||||
|
||||
if len(sys.argv) not in (2, 3):
|
||||
raise SystemExit("provide 1 or 2 program file names as arguments")
|
||||
|
||||
parser = Parser(source)
|
||||
program = parser.parse()
|
||||
timerprogram = Program([Block("timer", None, [], [
|
||||
Instruction(Opcode.RETURN, [], None, None)
|
||||
], {}, [])])
|
||||
# zero page and hardware stack of a 6502 cpu are off limits for now
|
||||
VM.readonly_mem_ranges = [(0x00, 0xff), (0x100, 0x1ff), (0xa000, 0xbfff), (0xe000, 0xffff)]
|
||||
vm = VM(program)
|
||||
vm = VM(mainprogram, timerprogram)
|
||||
vm.enable_charscreen(0x0400, 40, 25)
|
||||
vm.run()
|
||||
|
@ -13,6 +13,7 @@ class Opcode(enum.IntEnum):
|
||||
POP3 = 15
|
||||
DUP = 16
|
||||
DUP2 = 17
|
||||
SWAP = 18
|
||||
ADD = 50
|
||||
SUB = 51
|
||||
MUL = 52
|
||||
|
80
tinyvm/vm.py
80
tinyvm/vm.py
@ -20,7 +20,7 @@
|
||||
# read [byte/bytearray from keyboard],
|
||||
# wait [till any input comes available], @todo
|
||||
# check [if input is available) @todo
|
||||
# or via memory-mapped I/O (text screen matrix, keyboard scan register) @todo
|
||||
# or via memory-mapped I/O (text screen matrix, keyboard scan register @todo)
|
||||
#
|
||||
# CPU: stack based execution, no registers.
|
||||
# unlimited dynamic variables (v0, v1, ...) that have a value and a type.
|
||||
@ -63,6 +63,8 @@ import collections
|
||||
import array
|
||||
import threading
|
||||
import pprint
|
||||
import tkinter
|
||||
import tkinter.font
|
||||
from typing import Dict, List, Tuple, Union
|
||||
from il65.emit import mflpt5_to_float, to_mflpt5
|
||||
from .program import Instruction, Variable, Block, Program, Opcode
|
||||
@ -91,6 +93,9 @@ class Memory:
|
||||
def get_byte(self, index: int) -> int:
|
||||
return self.mem[index]
|
||||
|
||||
def get_bytes(self, startindex: int, amount: int) -> int:
|
||||
return self.mem[startindex: startindex+amount]
|
||||
|
||||
def get_sbyte(self, index: int) -> int:
|
||||
return 256 - self.mem[index]
|
||||
|
||||
@ -222,8 +227,9 @@ class VM:
|
||||
timer_irq_interlock = threading.Lock()
|
||||
timer_irq_event = threading.Event()
|
||||
|
||||
def __init__(self, program: Program, timerprogram: Program=Program([])) -> None:
|
||||
def __init__(self, program: Program, timerprogram: Program=None) -> None:
|
||||
opcode_names = [oc.name for oc in Opcode]
|
||||
timerprogram = timerprogram or Program([])
|
||||
for ocname in opcode_names:
|
||||
if not hasattr(self, "opcode_" + ocname):
|
||||
raise NotImplementedError("missing opcode method for " + ocname)
|
||||
@ -245,6 +251,7 @@ class VM:
|
||||
self.program = self.main_program
|
||||
self.stack = self.main_stack
|
||||
self.pc = None # type: Instruction
|
||||
self.charscreen = None
|
||||
self.system = System(self)
|
||||
assert all(i.next for i in self.main_program
|
||||
if i.opcode != Opcode.TERMINATE), "main: all instrs next must be set"
|
||||
@ -257,6 +264,9 @@ class VM:
|
||||
threading.Thread(target=self.timer_irq, name="timer_irq", daemon=True).start()
|
||||
print("[TinyVM starting up.]")
|
||||
|
||||
def enable_charscreen(self, screen_address: int, width: int, height: int) -> None:
|
||||
self.charscreen = (screen_address, width, height)
|
||||
|
||||
def flatten_programs(self, main: Program, timer: Program) \
|
||||
-> Tuple[List[Instruction], List[Instruction], Dict[str, Variable], Dict[str, Instruction]]:
|
||||
variables = {} # type: Dict[str, Variable]
|
||||
@ -322,6 +332,9 @@ class VM:
|
||||
i.next = nexti
|
||||
|
||||
def run(self) -> None:
|
||||
if self.charscreen:
|
||||
threading.Thread(target=ScreenViewer.create, args=(self.memory, self.system, self.charscreen), name="screenviewer", daemon=True).start()
|
||||
|
||||
self.pc = self.program[0] # first instruction of the main program
|
||||
self.stack.push(CallFrameMarker(None)) # enter the call frame so the timer program can end with a RETURN
|
||||
try:
|
||||
@ -342,11 +355,15 @@ class VM:
|
||||
print("[TinyVM execution ended.]")
|
||||
|
||||
def timer_irq(self) -> None:
|
||||
# This is the timer 'irq' handler. It runs the timer program at a certain interval.
|
||||
# NOTE: executing the timer program will LOCK the main program and vice versa!
|
||||
# (because the VM is a strictly single threaded machine)
|
||||
resolution = 1/30
|
||||
wait_time = resolution
|
||||
while True:
|
||||
self.timer_irq_event.wait(wait_time)
|
||||
self.timer_irq_event.clear()
|
||||
print("....timer irq", wait_time, time.time()) # XXX
|
||||
start = time.perf_counter()
|
||||
if self.timer_program:
|
||||
with self.timer_irq_interlock:
|
||||
@ -408,6 +425,11 @@ class VM:
|
||||
self.stack.push(x)
|
||||
return True
|
||||
|
||||
def opcode_SWAP(self, instruction: Instruction) -> bool:
|
||||
value2, value1 = self.stack.pop2()
|
||||
self.stack.push2(value2, value1)
|
||||
return True
|
||||
|
||||
def opcode_PUSH2(self, instruction: Instruction) -> bool:
|
||||
value1 = self.variables[instruction.args[0]].value
|
||||
value2 = self.variables[instruction.args[1]].value
|
||||
@ -561,6 +583,7 @@ class VM:
|
||||
Opcode.POP3: opcode_POP3,
|
||||
Opcode.DUP: opcode_DUP,
|
||||
Opcode.DUP2: opcode_DUP2,
|
||||
Opcode.SWAP: opcode_SWAP,
|
||||
Opcode.ADD: opcode_ADD,
|
||||
Opcode.SUB: opcode_SUB,
|
||||
Opcode.MUL: opcode_MUL,
|
||||
@ -588,16 +611,16 @@ class System:
|
||||
def __init__(self, vm: VM) -> None:
|
||||
self.vm = vm
|
||||
|
||||
def _encodestr(self, string: str, alt: bool=False) -> bytearray:
|
||||
def encodestr(self, string: str, alt: bool=False) -> bytearray:
|
||||
return bytearray(string, self.vm.str_alt_encoding if alt else self.vm.str_encoding)
|
||||
|
||||
def _decodestr(self, bb: Union[bytearray, array.array], alt: bool=False) -> str:
|
||||
def decodestr(self, bb: Union[bytearray, array.array], alt: bool=False) -> str:
|
||||
return str(bb, self.vm.str_alt_encoding if alt else self.vm.str_encoding) # type: ignore
|
||||
|
||||
def syscall_printstr(self) -> bool:
|
||||
value = self.vm.stack.pop()
|
||||
if isinstance(value, (bytearray, array.array)):
|
||||
print(self._decodestr(value), end="")
|
||||
print(self.decodestr(value), end="")
|
||||
return True
|
||||
else:
|
||||
raise TypeError("printstr expects bytearray", value)
|
||||
@ -605,23 +628,23 @@ class System:
|
||||
def syscall_printchr(self) -> bool:
|
||||
character = self.vm.stack.pop()
|
||||
if isinstance(character, int):
|
||||
print(self._decodestr(bytearray([character])), end="")
|
||||
print(self.decodestr(bytearray([character])), end="")
|
||||
return True
|
||||
else:
|
||||
raise TypeError("printchr expects integer (1 char)", character)
|
||||
|
||||
def syscall_input(self) -> bool:
|
||||
self.vm.stack.push(self._encodestr(input()))
|
||||
self.vm.stack.push(self.encodestr(input()))
|
||||
return True
|
||||
|
||||
def syscall_getchr(self) -> bool:
|
||||
self.vm.stack.push(self._encodestr(input() + '\n')[0])
|
||||
self.vm.stack.push(self.encodestr(input() + '\n')[0])
|
||||
return True
|
||||
|
||||
def syscall_decimalstr_signed(self) -> bool:
|
||||
value = self.vm.stack.pop()
|
||||
if type(value) is int:
|
||||
self.vm.stack.push(self._encodestr(str(value)))
|
||||
self.vm.stack.push(self.encodestr(str(value)))
|
||||
return True
|
||||
else:
|
||||
raise TypeError("decimalstr expects int", value)
|
||||
@ -633,7 +656,7 @@ class System:
|
||||
strvalue = "${:x}".format(value)
|
||||
else:
|
||||
strvalue = "-${:x}".format(-value) # type: ignore
|
||||
self.vm.stack.push(self._encodestr(strvalue))
|
||||
self.vm.stack.push(self.encodestr(strvalue))
|
||||
return True
|
||||
else:
|
||||
raise TypeError("hexstr expects int", value)
|
||||
@ -668,3 +691,40 @@ class System:
|
||||
for i, b in enumerate(strbytes): # type: ignore
|
||||
self.vm.memory.set_byte(address+i, b) # type: ignore
|
||||
return True
|
||||
|
||||
def syscall_smalldelay(self) -> bool:
|
||||
time.sleep(1/100)
|
||||
return True
|
||||
|
||||
def syscall_delay(self) -> bool:
|
||||
time.sleep(0.5)
|
||||
return True
|
||||
|
||||
|
||||
class ScreenViewer(tkinter.Tk):
|
||||
def __init__(self, memory: Memory, system: System, screenconfig: Tuple[int, int, int]) -> None:
|
||||
super().__init__()
|
||||
self.fontsize = 14
|
||||
self.memory = memory
|
||||
self.system = system
|
||||
self.address = screenconfig[0]
|
||||
self.width = screenconfig[1]
|
||||
self.height = screenconfig[2]
|
||||
self.monospace = tkinter.font.Font(self, family="Courier", weight="bold", size=self.fontsize)
|
||||
cw = self.monospace.measure("x")*self.width+8
|
||||
self.canvas = tkinter.Canvas(self, width=cw, height=self.fontsize*self.height+8, bg="blue")
|
||||
self.canvas.pack()
|
||||
self.after(10, self.update_screen)
|
||||
|
||||
def update_screen(self):
|
||||
self.canvas.delete(tkinter.ALL)
|
||||
for y in range(self.height):
|
||||
line = self.memory.get_bytes(self.address+y*self.width, self.width)
|
||||
text = self.system.decodestr(line)
|
||||
self.canvas.create_text(4, self.fontsize*y, text=text, fill="white", font=self.monospace, anchor=tkinter.NW)
|
||||
self.after(10, self.update_screen)
|
||||
|
||||
@classmethod
|
||||
def create(cls, memory: Memory, system: System, screenconfig: Tuple[int, int, int]) -> None:
|
||||
viewer = cls(memory, system, screenconfig)
|
||||
viewer.mainloop()
|
||||
|
Loading…
Reference in New Issue
Block a user