timer 'irq' in vm

This commit is contained in:
Irmen de Jong 2018-02-27 22:16:45 +01:00
parent e9b5bc8200
commit 13273ce6cc
5 changed files with 178 additions and 139 deletions

View File

@ -1,50 +1,35 @@
; source code for a tinyvm program ; source code for a tinyvm program
%block b1 %block b1
%vardefs %vardefs
var byte vb 1 var byte teller 1
var word vw 2233 const byte one 1
var sbyte vbs -2 const byte thousand 1000
var sword vws -3344 var array_byte newlinestr 1 10
var float vf 1.234
var array_byte ab 3 1
var array_sbyte asb 3 -2
var array_word aw 3 4455
var array_sword asw 3 -5566
var matrix_byte mb 2 3 1
var matrix_sbyte msb 2 3 -2
var array_byte ab2 5 1
var array_sbyte asb2 5 -2
var array_word aw2 5 6677
var array_sword asw2 5 -8899
var matrix_byte mb2 4 5 1
var matrix_sbyte msb2 4 5 -2
%end_vardefs %end_vardefs
%instructions %instructions
push vw push teller
pop vb syscall decimalstr_signed
terminate syscall printstr
push newlinestr
syscall printstr
back:
push teller
push one
add
pop teller
push teller
syscall decimalstr_signed
syscall printstr
push newlinestr
syscall printstr
push teller
push thousand
cmp_lt
jump_if_true back
return
%end_instructions %end_instructions
%subblocks %subblocks
%block b2
%vardefs
%end_vardefs
%end_block ; b2
%end_subblocks %end_subblocks
%end_block ;b1 %end_block ;b1
%block b3
%vardefs
%end_vardefs
%instructions
nop
nop
l2:
return
%end_instructions
%end_block ; b3

View File

@ -3,36 +3,34 @@ from typing import Any, List, Dict, Optional
class Opcode(enum.IntEnum): class Opcode(enum.IntEnum):
NOP = 0 TERMINATE = 0
TERMINATE = 1 NOP = 1
PUSH = 10 PUSH = 10
PUSH2 = 11 PUSH2 = 11
PUSH3 = 12 PUSH3 = 12
POP = 20 POP = 13
POP2 = 21 POP2 = 14
POP3 = 22 POP3 = 15
ADD = 100 ADD = 50
SUB = 101 SUB = 51
MUL = 102 MUL = 52
DIV = 103 DIV = 53
AND = 200 AND = 70
OR = 201 OR = 71
XOR = 202 XOR = 72
NOT = 203 NOT = 73
CMP_EQ = 300 TEST = 100
CMP_LT = 301 CMP_EQ = 101
CMP_GT = 302 CMP_LT = 102
CMP_LTE = 303 CMP_GT = 103
CMP_GTE = 304 CMP_LTE = 104
TEST = 305 CMP_GTE = 105
RETURN = 500 CALL = 200
JUMP = 501 RETURN = 201
JUMP_IF_TRUE = 502 JUMP = 202
JUMP_IF_FALSE = 503 JUMP_IF_TRUE = 203
SYSCALL = 504 JUMP_IF_FALSE = 204
SYSCALL = 205
CONDITIONAL_OPCODES = {Opcode.JUMP_IF_FALSE, Opcode.JUMP_IF_TRUE}
class DataType(enum.IntEnum): class DataType(enum.IntEnum):
@ -63,13 +61,13 @@ class Variable:
class Instruction: class Instruction:
__slots__ = ["opcode", "args", "next", "condnext"] __slots__ = ["opcode", "args", "next", "alt_next"]
def __init__(self, opcode: Opcode, args: List[Any], nxt: Optional['Instruction'], condnxt: Optional['Instruction']) -> None: def __init__(self, opcode: Opcode, args: List[Any], nxt: Optional['Instruction'], alt_next: Optional['Instruction']) -> None:
self.opcode = opcode self.opcode = opcode
self.args = args self.args = args
self.next = nxt # regular next statement, None=end self.next = nxt # regular next statement, None=end
self.condnext = condnxt # alternate next statement (for condition nodes) self.alt_next = alt_next # alternate next statement (for condition nodes, and return instruction for call nodes)
def __str__(self) -> str: def __str__(self) -> str:
return "<Instruction {} args: {}>".format(self.opcode.name, self.args) return "<Instruction {} args: {}>".format(self.opcode.name, self.args)

View File

@ -1,6 +1,6 @@
import sys import sys
from .parse import Parser from .parse import Parser
from .core import Program from .core import Program, Opcode, Block, Instruction
from .vm import VM from .vm import VM
@ -8,8 +8,10 @@ source = open(sys.argv[1]).read()
parser = Parser(source) parser = Parser(source)
program = parser.parse() program = parser.parse()
timerprogram = Program([]) timerprogram = Program([Block("timer", None, [], [
Instruction(Opcode.RETURN, [], None, None)
], {}, [])])
# zero page and hardware stack of a 6502 cpu are off limits for now # zero page and hardware stack of a 6502 cpu are off limits for now
VM.readonly_mem_ranges = [(0x00, 0xff), (0x100, 0x1ff), (0xa000, 0xbfff), (0xe000, 0xffff)] VM.readonly_mem_ranges = [(0x00, 0xff), (0x100, 0x1ff), (0xa000, 0xbfff), (0xe000, 0xffff)]
vm = VM(program, timerprogram) vm = VM(program)
vm.run() vm.run()

View File

@ -125,10 +125,13 @@ class Parser:
label = line[:-1].rstrip() label = line[:-1].rstrip()
self.lineno += 1 self.lineno += 1
line = self.source[self.lineno] line = self.source[self.lineno]
labels[label] = parse_instruction(line) next_instruction = parse_instruction(line)
labels[label] = next_instruction
instructions.append(next_instruction)
self.lineno += 1
else: else:
instructions.append(parse_instruction(line)) instructions.append(parse_instruction(line))
self.lineno += 1 self.lineno += 1
self.skip_empty() self.skip_empty()
assert self.source[self.lineno].startswith("%end_instructions") assert self.source[self.lineno].startswith("%end_instructions")
self.lineno += 1 self.lineno += 1

View File

@ -51,17 +51,19 @@
# call function (arguments are on stack) # call function (arguments are on stack)
# enter / exit (function call frame) # enter / exit (function call frame)
# #
# TIMER INTERRUPT: triggered every 1/60th of a second. # TIMER INTERRUPT: triggered around each 1/30th of a second.
# executes on a DIFFERENT stack and with a different PROGRAM LIST, # executes on a DIFFERENT stack and with a different PROGRAM LIST,
# but with access to ALL THE SAME DYNAMIC VARIABLES. # but with access to ALL THE SAME DYNAMIC VARIABLES.
# This suspends the main program until the timer program RETURNs!
# #
import time import time
import itertools import itertools
import collections import collections
import array import array
import threading
import pprint import pprint
from .core import Instruction, Variable, Block, Program, Opcode, CONDITIONAL_OPCODES from .core import Instruction, Variable, Block, Program, Opcode
from typing import Dict, List, Tuple, Union from typing import Dict, List, Tuple, Union
from il65.emit import mflpt5_to_float, to_mflpt5 from il65.emit import mflpt5_to_float, to_mflpt5
@ -131,7 +133,18 @@ class Memory:
self.mem[index: index+5] = to_mflpt5(value) self.mem[index: index+5] = to_mflpt5(value)
StackValueType = Union[bool, int, float, bytearray, array.array] class CallFrameMarker:
pass
class ReturnInstruction:
__slots__ = ["instruction"]
def __init__(self, instruction: Instruction) -> None:
self.instruction = instruction
StackValueType = Union[bool, int, float, bytearray, array.array, CallFrameMarker, ReturnInstruction]
class Stack: class Stack:
@ -188,8 +201,8 @@ class Stack:
self.stack[-2] = x self.stack[-2] = x
def _typecheck(self, value: StackValueType): def _typecheck(self, value: StackValueType):
if type(value) not in (bool, int, float, bytearray, array.array): if type(value) not in (bool, int, float, bytearray, array.array, CallFrameMarker, ReturnInstruction):
raise TypeError("stack can only contain bool, int, float, (byte)array") raise TypeError("invalid item type pushed")
# noinspection PyPep8Naming,PyUnusedLocal,PyMethodMayBeStatic # noinspection PyPep8Naming,PyUnusedLocal,PyMethodMayBeStatic
@ -197,8 +210,11 @@ class VM:
str_encoding = "iso-8859-15" str_encoding = "iso-8859-15"
str_alt_encoding = "iso-8859-15" str_alt_encoding = "iso-8859-15"
readonly_mem_ranges = [] # type: List[Tuple[int, int]] readonly_mem_ranges = [] # type: List[Tuple[int, int]]
timer_irq_resolution = 1/30
timer_irq_interlock = threading.Lock()
timer_irq_event = threading.Event()
def __init__(self, program: Program, timerprogram: Program) -> None: def __init__(self, program: Program, timerprogram: Program=Program([])) -> None:
opcode_names = [oc.name for oc in Opcode] opcode_names = [oc.name for oc in Opcode]
for ocname in opcode_names: for ocname in opcode_names:
if not hasattr(self, "opcode_" + ocname): if not hasattr(self, "opcode_" + ocname):
@ -212,33 +228,39 @@ class VM:
self.memory.mark_readonly(start, end) self.memory.mark_readonly(start, end)
self.main_stack = Stack() self.main_stack = Stack()
self.timer_stack = Stack() self.timer_stack = Stack()
(self.main_program, self.timer_program), self.variables, self.labels = self.flatten_programs(program, timerprogram) self.main_program, self.timer_program, self.variables, self.labels = self.flatten_programs(program, timerprogram)
print("MAIN PROGRAM"); pprint.pprint([str(i) for i in self.main_program])
self.connect_instruction_pointers(self.main_program) self.connect_instruction_pointers(self.main_program)
self.connect_instruction_pointers(self.timer_program) self.connect_instruction_pointers(self.timer_program)
self.program = self.main_program self.program = self.main_program
self.stack = self.main_stack self.stack = self.main_stack
self.pc = None # type: Instruction self.pc = None # type: Instruction
self.previous_pc = None # type: Instruction
self.system = System(self) self.system = System(self)
assert all(i.next for i in self.main_program assert all(i.next for i in self.main_program
if i.opcode != Opcode.TERMINATE), "main: all instrs next must be set" if i.opcode != Opcode.TERMINATE), "main: all instrs next must be set"
assert all(i.next for i in self.timer_program assert all(i.next for i in self.timer_program
if i.opcode != Opcode.TERMINATE), "main: all instrs next must be set" if i.opcode not in (Opcode.TERMINATE, Opcode.RETURN)), "timer: all instrs next must be set"
assert all(i.condnext for i in self.main_program assert all(i.alt_next for i in self.main_program
if i.opcode in CONDITIONAL_OPCODES), "timer: all conditional instrs condnext must be set" if i.opcode in (Opcode.CALL, Opcode.JUMP_IF_FALSE, Opcode.JUMP_IF_TRUE)), "main: alt_nexts must be set"
assert all(i.condnext for i in self.timer_program assert all(i.alt_next for i in self.timer_program
if i.opcode in CONDITIONAL_OPCODES), "timer: all conditional instrs condnext must be set" if i.opcode in (Opcode.CALL, Opcode.JUMP_IF_FALSE, Opcode.JUMP_IF_TRUE)), "timer: alt_nexts must be set"
threading.Thread(target=self.timer_irq, name="timer_irq", daemon=True).start()
print("[TinyVM starting up.]") print("[TinyVM starting up.]")
def flatten_programs(self, *programs: Program) -> Tuple[List[List[Instruction]], Dict[str, Variable], Dict[str, Instruction]]: def flatten_programs(self, main: Program, timer: Program) \
variables = {} # type: Dict[str, Variable] -> Tuple[List[Instruction], List[Instruction], Dict[str, Variable], Dict[str, Instruction]]:
labels = {} # type: Dict[str, Instruction] variables = {} # type: Dict[str, Variable]
flat_programs = [] # type: List[List[Instruction]] labels = {} # type: Dict[str, Instruction]
for program in programs: instructions_main = [] # type: List[Instruction]
for block in program.blocks: instructions_timer = [] # type: List[Instruction]
flat = self.flatten(block, variables, labels) for block in main.blocks:
flat_programs.append(flat) flat = self.flatten(block, variables, labels)
return flat_programs, variables, labels instructions_main.extend(flat)
instructions_main.append(Instruction(Opcode.TERMINATE, [], None, None))
for block in timer.blocks:
flat = self.flatten(block, variables, labels)
instructions_timer.extend(flat)
return instructions_main, instructions_timer, variables, labels
def flatten(self, block: Block, variables: Dict[str, Variable], labels: Dict[str, Instruction]) -> List[Instruction]: def flatten(self, block: Block, variables: Dict[str, Variable], labels: Dict[str, Instruction]) -> List[Instruction]:
def block_prefix(b: Block) -> str: def block_prefix(b: Block) -> str:
@ -269,7 +291,6 @@ class VM:
labels[name] = instr labels[name] = instr
for subblock in block.blocks: for subblock in block.blocks:
instructions.extend(self.flatten(subblock, variables, labels)) instructions.extend(self.flatten(subblock, variables, labels))
instructions.append(Instruction(Opcode.TERMINATE, [], None, None))
del block.instructions del block.instructions
del block.variables del block.variables
del block.labels del block.labels
@ -279,42 +300,65 @@ class VM:
i1, i2 = itertools.tee(instructions) i1, i2 = itertools.tee(instructions)
next(i2, None) next(i2, None)
for i, nexti in itertools.zip_longest(i1, i2): for i, nexti in itertools.zip_longest(i1, i2):
i.next = nexti if i.opcode in (Opcode.JUMP_IF_TRUE, Opcode.JUMP_IF_FALSE):
if i.opcode in CONDITIONAL_OPCODES: i.next = nexti # normal flow target
i.condnext = self.labels[i.args[0]] i.alt_next = self.labels[i.args[0]] # conditional jump target
elif i.opcode == Opcode.JUMP:
i.next = self.labels[i.args[0]] # jump target
elif i.opcode == Opcode.CALL:
i.next = self.labels[i.args[0]] # call target
i.alt_next = nexti # return instruction
else:
i.next = nexti
def run(self) -> None: def run(self) -> None:
last_timer = time.time()
self.pc = self.program[0] # first instruction of the main program self.pc = self.program[0] # first instruction of the main program
steps = 1 self.stack.push(ReturnInstruction(None)) # sentinel
self.stack.push(CallFrameMarker()) # enter the call frame so the timer program can end with a RETURN
try: try:
while True: while self.pc is not None:
now = time.time() with self.timer_irq_interlock:
# if now - last_timer >= 1/60: next_pc = getattr(self, "opcode_" + self.pc.opcode.name)(self.pc)
# last_timer = now if next_pc:
# # start running the timer interrupt program instead self.pc = self.pc.next
# self.previous_pc = self.pc
# self.program = self.timer_program
# self.stack = self.timer_stack
# self.pc = 0
# while True:
# self.dispatch(self.program[self.pc])
# return True
# self.pc = self.previous_pc
# self.program = self.mainprogram
# self.stack = self.mainstack
next_pc = getattr(self, "opcode_" + self.pc.opcode.name)(self.pc)
if next_pc:
self.pc = self.pc.next
steps += 1
except TerminateExecution as x: except TerminateExecution as x:
why = str(x) why = str(x)
print("[TinyVM execution halted{:s}]\n".format(": "+why if why else ".")) print("[TinyVM execution terminated{:s}]\n".format(": "+why if why else "."))
return return
except Exception as x: except Exception as x:
print("EXECUTION ERROR") print("EXECUTION ERROR")
self.debug_stack(5) self.debug_stack(5)
raise raise
else:
print("[TinyVM execution ended.]")
def timer_irq(self) -> None:
resolution = 1/30
wait_time = resolution
while True:
self.timer_irq_event.wait(wait_time)
self.timer_irq_event.clear()
start = time.perf_counter()
if self.timer_program:
with self.timer_irq_interlock:
previous_pc = self.pc
previous_program = self.program
previous_stack = self.stack
self.stack = self.timer_stack
self.program = self.timer_program
self.pc = self.program[0]
self.stack.push(ReturnInstruction(None)) # sentinel
self.stack.push(CallFrameMarker()) # enter the call frame so the timer program can end with a RETURN
while self.pc is not None:
next_pc = getattr(self, "opcode_" + self.pc.opcode.name)(self.pc)
if next_pc:
self.pc = self.pc.next
self.pc = previous_pc
self.program = previous_program
self.stack = previous_stack
current = time.perf_counter()
duration, previously = current - start, current
wait_time = max(0, resolution - duration)
def debug_stack(self, size: int=5) -> None: def debug_stack(self, size: int=5) -> None:
stack = self.stack.debug_peek(size) stack = self.stack.debug_peek(size)
@ -424,15 +468,15 @@ class VM:
self.stack.push(not self.stack.pop()) self.stack.push(not self.stack.pop())
return True return True
def opcode_TEST(self, instruction: Instruction) -> bool:
self.stack.push(bool(self.stack.pop()))
return True
def opcode_CMP_EQ(self, instruction: Instruction) -> bool: def opcode_CMP_EQ(self, instruction: Instruction) -> bool:
second, first = self.stack.pop2() second, first = self.stack.pop2()
self.stack.push(first == second) self.stack.push(first == second)
return True return True
def opcode_TEST(self, instruction: Instruction) -> bool:
self.stack.push(bool(self.stack.pop()))
return True
def opcode_CMP_LT(self, instruction: Instruction) -> bool: def opcode_CMP_LT(self, instruction: Instruction) -> bool:
second, first = self.stack.pop2() second, first = self.stack.pop2()
self.stack.push(first < second) # type: ignore self.stack.push(first < second) # type: ignore
@ -453,29 +497,36 @@ class VM:
self.stack.push(first >= second) # type: ignore self.stack.push(first >= second) # type: ignore
return True return True
def opcode_RETURN(self, instruction: Instruction) -> bool: def opcode_CALL(self, instruction: Instruction) -> bool:
# returns from the current function call self.stack.push(ReturnInstruction(instruction.alt_next))
# any return values have already been pushed on the stack self.stack.push(CallFrameMarker())
raise NotImplementedError("return")
def opcode_JUMP(self, instruction: Instruction) -> bool:
# jumps unconditionally by resetting the PC to the given instruction index value
return True return True
def opcode_RETURN(self, instruction: Instruction) -> bool:
# unwind the function call frame
item = self.stack.pop()
while not isinstance(item, CallFrameMarker):
item = self.stack.pop()
returninstruction = self.stack.pop()
assert isinstance(returninstruction, ReturnInstruction)
self.pc = returninstruction.instruction
return False
def opcode_JUMP(self, instruction: Instruction) -> bool:
return True # jump simply points to the next instruction elsewhere
def opcode_JUMP_IF_TRUE(self, instruction: Instruction) -> bool: def opcode_JUMP_IF_TRUE(self, instruction: Instruction) -> bool:
# pops stack and jumps if that value is true, by resetting the PC to the given instruction index value
result = self.stack.pop() result = self.stack.pop()
if result: if result:
self.pc = self.pc.condnext self.pc = self.pc.alt_next # alternative next instruction
return False return False
return True return True
def opcode_JUMP_IF_FALSE(self, instruction: Instruction) -> bool: def opcode_JUMP_IF_FALSE(self, instruction: Instruction) -> bool:
# pops stack and jumps if that value is false, by resetting the PC to the given instruction index value
result = self.stack.pop() result = self.stack.pop()
if result: if result:
return True return True
self.pc = self.pc.condnext self.pc = self.pc.alt_next # alternative next instruction
return False return False
def opcode_SYSCALL(self, instruction: Instruction) -> bool: def opcode_SYSCALL(self, instruction: Instruction) -> bool:
@ -498,7 +549,7 @@ class System:
def syscall_printstr(self) -> bool: def syscall_printstr(self) -> bool:
value = self.vm.stack.pop() value = self.vm.stack.pop()
if isinstance(value, bytearray): if isinstance(value, (bytearray, array.array)):
print(self._decodestr(value), end="") print(self._decodestr(value), end="")
return True return True
else: else: