diff --git a/movie.py b/movie.py index 0be899c..758c302 100644 --- a/movie.py +++ b/movie.py @@ -26,8 +26,6 @@ class Movie: self.video.update_priority ) - self._last_op = opcodes.Nop() - def encode(self) -> Iterator[opcodes.Opcode]: video_frames = self.video.frames() video_seq = None @@ -50,10 +48,9 @@ class Movie: def _emit_bytes(self, _op): # print("%04X:" % self.stream_pos) - for b in self.state.emit(self._last_op, _op): + for b in self.state.emit(_op): yield b self.stream_pos += 1 - self._last_op = _op def emit_stream(self, ops: Iterable[opcodes.Opcode]) -> Iterator[int]: for op in ops: @@ -67,8 +64,6 @@ class Movie: for _ in range(nops): yield from self._emit_bytes(opcodes.Nop()) yield from self._emit_bytes(opcodes.Ack()) - # Ack falls through to nop - self._last_op = opcodes.Nop() yield from self._emit_bytes(op) def done(self) -> Iterator[int]: diff --git a/opcodes.py b/opcodes.py index c89bd8f..090d36b 100644 --- a/opcodes.py +++ b/opcodes.py @@ -29,8 +29,8 @@ class State: self.cycle_counter = cycle_counter self.update_priority = update_priority - def emit(self, last_opcode: "Opcode", opcode: "Opcode") -> Iterator[int]: - cmd = opcode.emit_command(last_opcode, opcode) + def emit(self, opcode: "Opcode") -> Iterator[int]: + cmd = opcode.emit_command(opcode) if cmd: yield from cmd data = opcode.emit_data() @@ -45,11 +45,6 @@ class State: _op_cmds = [ - "STORE", - "SET_CONTENT", # # set new data byte to write - "SET_PAGE", - "RLE", - "TICK", "TERMINATE", "NOP", "ACK", @@ -71,9 +66,6 @@ class Opcode: # Offset of last byte in decoder opcode _END = None # type: int - # Opcode uses relative addressing to branch to next opcode - _RELATIVE_BRANCH = False - def __repr__(self): return "Opcode(%s)" % self.COMMAND.name @@ -91,18 +83,10 @@ class Opcode: return self._CYCLES @staticmethod - def emit_command(last_opcode: "Opcode", - opcode: "Opcode") -> Iterator[int]: - # Compute offset from last opcode's terminating BRA instruction to - # first instruction of this opcode. - if last_opcode._RELATIVE_BRANCH: - offset = (opcode._START - last_opcode._END - 1) & 0xff - - # print("%s -> %s = %02x" % (last_opcode, opcode, offset)) - yield offset - else: - yield opcode._START >> 8 - yield opcode._START & 0xff + def emit_command(opcode: "Opcode") -> Iterator[int]: + # Emit address of next opcode + yield opcode._START >> 8 + yield opcode._START & 0xff def emit_data(self) -> Iterator[int]: return @@ -113,152 +97,15 @@ class Opcode: class Nop(Opcode): COMMAND = OpcodeCommand.NOP - _CYCLES = 11 + _CYCLES = 11 # TODO: count def __data_eq__(self, other): return True -class Store(Opcode): - COMMAND = OpcodeCommand.STORE - _CYCLES = 20 - - _RELATIVE_BRANCH = True - - def __init__(self, offset: int): - if offset < 0 or offset > 255: - raise ValueError("Invalid offset: %d" % offset) - - self.offset = offset - - def __repr__(self): - return "Opcode(%s, %02x)" % ( - self.COMMAND.name, self.offset) - - def __data_eq__(self, other): - return self.offset == other.offset - - def emit_data(self): - # print(" Store @ %02x" % self.offset) - yield self.offset - - def apply(self, state): - state.memmap.write(state.page, self.offset, state.content) - # TODO: screen page - state.update_priority[state.page - 32, self.offset] = 0 - - -class SetContent(Opcode): - COMMAND = OpcodeCommand.SET_CONTENT - _CYCLES = 15 - - _RELATIVE_BRANCH = True - - def __init__(self, content: int): - self.content = content - - def __repr__(self): - return "Opcode(%s, %02x)" % ( - self.COMMAND.name, self.content) - - def __data_eq__(self, other): - return self.content == other.content - - def emit_data(self): - yield self.content - - def apply(self, state: State): - # print(" Set content %02x" % self.content) - state.content = self.content - - -class SetPage(Opcode): - COMMAND = OpcodeCommand.SET_PAGE - _CYCLES = 23 - - _RELATIVE_BRANCH = True - - def __init__(self, page: int): - self.page = page - - def __repr__(self): - return "Opcode(%s, %02x)" % ( - self.COMMAND.name, self.page) - - def __data_eq__(self, other): - return self.page == other.page - - def emit_data(self): - yield self.page - - def apply(self, state: State): - # print(" Set page %02x" % self.page) - state.page = self.page - - -class RLE(Opcode): - COMMAND = OpcodeCommand.RLE - _CYCLES = 22 - - _RELATIVE_BRANCH = True - - def __init__(self, start_offset: int, run_length: int): - self.start_offset = start_offset - self.run_length = run_length - - def __repr__(self): - return "Opcode(%s, %02x, %02x)" % ( - self.COMMAND.name, self.start_offset, self.run_length) - - def __data_eq__(self, other): - return ( - self.start_offset == other.start_offset and - self.run_length == other.run_length) - - def emit_data(self): - # print(" RLE @ %02x * %02x" % (self.start_offset, self.run_length)) - yield self.start_offset - yield self.run_length - 1 - - @property - def cycles(self): - return 22 + 10 * self.run_length - - def apply(self, state): - for i in range(self.run_length): - offset = (self.start_offset + i) & 0xff - state.memmap.write(state.page, offset, state.content) - # TODO: screen page - state.update_priority[state.page - 32, offset] = 0 - - -class Tick(Opcode): - COMMAND = OpcodeCommand.TICK - - _RELATIVE_BRANCH = True - - def __init__(self, cycles: int): - self._START -= (cycles - 15) // 2 - self._cycles = cycles - - def __repr__(self): - return "Opcode(%s, %02x)" % ( - self.COMMAND.name, self.cycles) - - def __data_eq__(self, other): - return self._cycles == other._cycles - - @property - def cycles(self): - return self._cycles - - def emit_data(self): - print(" Tick @ %02x" % self.cycles) - - class Terminate(Opcode): COMMAND = OpcodeCommand.TERMINATE - _CYCLES = 6 + _CYCLES = 6 # TODO: count def __data_eq__(self, other): return True @@ -266,7 +113,7 @@ class Terminate(Opcode): class Ack(Opcode): COMMAND = OpcodeCommand.ACK - _CYCLES = 100 # XXX todo + _CYCLES = 100 # TODO: count def __data_eq__(self, other): return True @@ -344,12 +191,7 @@ def _FillOpcodeAddresses(): _OPCODE_ADDRS = _ParseSymbolTable() _OPCODE_CLASSES = { - # OpcodeCommand.STORE: Store, - # OpcodeCommand.SET_CONTENT: SetContent, - # OpcodeCommand.SET_PAGE: SetPage, - # OpcodeCommand.RLE: RLE, - # OpcodeCommand.TICK: Tick, - # OpcodeCommand.TERMINATE: Terminate, + OpcodeCommand.TERMINATE: Terminate, OpcodeCommand.NOP: Nop, OpcodeCommand.ACK: Ack, } @@ -359,49 +201,3 @@ for _tick in range(4, 68, 2): _tickop = OpcodeCommand["TICK_%d_PAGE_%d" % (_tick, _page)] _OPCODE_CLASSES[_tickop] = TICK_OPCODES[(_tick, _page)] _FillOpcodeAddresses() - - -class Decoder: - def __init__(self, state: State): - self.state = state # type: State - - def decode_stream(self, stream: Iterator[int]) -> Tuple[int, int, int, int]: - """Replay an opcode stream to build a screen image.""" - num_content_changes = 0 - num_page_changes = 0 - num_content_stores = 0 - num_rle_bytes = 0 - - terminate = False - for b in stream: - if b == OpcodeCommand.SET_CONTENT.value: - content = next(stream) - op = SetContent(content) - num_content_changes += 1 - elif b == OpcodeCommand.SET_PAGE.value: - page = next(stream) - op = SetPage(page) - num_page_changes += 1 - elif b == OpcodeCommand.RLE.value: - offset = next(stream) - run_length = next(stream) - num_rle_bytes += run_length - op = RLE(offset, run_length) - elif b == OpcodeCommand.TICK.value: - cycles = next(stream) - op = Tick(cycles) - elif b == OpcodeCommand.TERMINATE.value: - op = Terminate() - terminate = True - else: - op = Store(b) - num_content_stores += 1 - - op.apply(self.state) - if terminate: - break - - return ( - num_content_stores, num_content_changes, num_page_changes, - num_rle_bytes - )