From c7978523243343391ca3690c7b761b43c36741e2 Mon Sep 17 00:00:00 2001 From: kris Date: Thu, 3 Jan 2019 17:38:47 +0000 Subject: [PATCH] - Stop masking out unchanged bytes explicitly and compare the full source vs target frame. This allows us to accumulate runs across unchanged bytes, if they happen to be the same content value. - introduce an allowable bit error when building runs, i.e. trade some slight imprecision for much more efficient decoding. This gives a slight (~2%) reduction in similarity on my test frames at 140 pixels but improves the 280 pixel similarity significantly (~7%) - so make 280 pixels the default for now - once the run is complete, compute the median value of each bit in the run and use that as content byte. I also tried mean which had exactly the same output - runs will sometimes now span the (0x7x) screen holes so for now just ignore invalid addresses in _write --- screen.py | 112 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 73 insertions(+), 39 deletions(-) diff --git a/screen.py b/screen.py index 3a08295..ac50d2a 100644 --- a/screen.py +++ b/screen.py @@ -43,7 +43,8 @@ class Opcode(enum.Enum): class Frame: """Bitmapped screen frame.""" - XMAX = 140 # 280 # double-wide pixels to not worry about colour effects + XMAX = 280 # 140 # double-wide pixels to not worry about colour + # effects YMAX = 192 def __init__(self, bitmap: np.array = None): @@ -93,7 +94,7 @@ class Screen: """ # Double each pixel horizontally - pixels = np.repeat(bitmap, 2, axis=1) + pixels = bitmap # np.repeat(bitmap, 2, axis=1) # Insert zero column after every 7 for i in range(pixels.shape[1] // 7 - 1, -1, -1): @@ -134,10 +135,6 @@ class Screen: # Target screen memory map for new frame target = self._encode(frame.bitmap) - # Compute difference from current frame - delta = np.bitwise_xor(self.screen, target) - delta = np.ma.masked_array(delta, np.logical_not(delta)) - # Estimate number of opcodes that will end up fitting in the cycle # budget. est_opcodes = int(cycle_budget / fullness / self.CYCLES[0]) @@ -145,44 +142,45 @@ class Screen: # Sort by highest xor weight and take the estimated number of change # operations changes = list( - sorted(self.index_changes(delta, target), reverse=True) + sorted(self.index_changes(self.screen, target), reverse=True) )[:est_opcodes] for b in self._heuristic_page_first_opcode_scheduler(changes): yield b - def index_changes(self, deltas: np.array, + def index_changes(self, source: np.array, target: np.array) -> Set[Tuple[int, int, int, int, int]]: """Transform encoded screen to sequence of change tuples. Change tuple is (xor_weight, page, offset, content) """ + # Compute difference from current frame + deltas = np.bitwise_xor(self.screen, target) + deltas = np.ma.masked_array(deltas, np.logical_not(deltas)) + changes = set() - - # Find runs in masked image - - memmap = defaultdict(lambda: [(None, None)] * 256) + # TODO: don't use 256 bytes if XMAX is smaller, or we may compute RLE + # over the full page! + memmap = defaultdict(lambda: [(0, 0, 0)] * 256) it = np.nditer(target, flags=['multi_index']) while not it.finished: y, x_byte = it.multi_index - # Skip masked values, i.e. unchanged in new frame - xor = deltas[y][x_byte] - if xor is np.ma.masked: - it.iternext() - continue y_base = self.Y_TO_BASE_ADDR[self.page][y] page = y_base >> 8 # print("y=%d -> page=%02x" % (y, page)) - xor_weight = hamming_weight(xor) offset = y_base - (page << 8) + x_byte - content = np.asscalar(it[0]) - memmap[page][offset] = (xor_weight, content) + src_content = source[y][x_byte] + target_content = np.asscalar(it[0]) + + bits_different = hamming_weight(src_content ^ target_content) + + memmap[page][offset] = (bits_different, src_content, target_content) it.iternext() for page, offsets in memmap.items(): @@ -190,31 +188,62 @@ class Screen: run_length = 0 maybe_run = [] for offset, data in enumerate(offsets): - xor_weight, content = data + bits_different, src_content, target_content = data - if cur_content != content and cur_content is not None: + # TODO: allowing odd bit errors introduces colour error + if maybe_run and hamming_weight( + cur_content ^ target_content) > 2: # End of run - if run_length >= 4: + + # Decide if it's worth emitting as a run vs single stores + + # Number of changes in run for which >0 bits differ + num_changes = len([c for c in maybe_run if c[0]]) + run_cost = self.CYCLES[Opcode.RLE] + run_length * 9 + single_cost = self.CYCLES[0] * num_changes + #print("Run of %d cheaper than %d singles" % ( + # run_length, num_changes)) + + # TODO: don't allow too much error to accumulate + + if run_cost < single_cost: + # Compute median bit value over run + median_bits = np.median( + np.vstack( + np.unpackbits( + np.array(r[3], dtype=np.uint8) + ) + for r in maybe_run + ), axis=0 + ) > 0.5 + + typical_content = np.asscalar(np.packbits(median_bits)) + total_xor = sum(ch[0] for ch in maybe_run) - change = (total_xor, page, offset - run_length, - cur_content, run_length) - #print("Found run of %d * %2x at %2x:%2x" % ( + start_offset = maybe_run[0][2] + + change = (total_xor, page, start_offset, + typical_content, run_length) + # print("Found run of %d * %2x at %2x:%2x" % ( # run_length, cur_content, page, offset - run_length) # ) + #print(maybe_run) + #print("change =", change) changes.add(change) else: - changes.update(ch for ch in maybe_run) + changes.update(ch for ch in maybe_run if ch[0]) maybe_run = [] run_length = 0 - cur_content = content + cur_content = target_content if cur_content is None: - cur_content = content + cur_content = target_content - if content is not None: - run_length += 1 - maybe_run.append((xor_weight, page, offset, content, 1)) - assert len(maybe_run) == run_length, (maybe_run, run_length) + run_length += 1 +# if bits_different != 0: +# # Only accumulate bytes for which src != target content + maybe_run.append( + (bits_different, page, offset, target_content, 1)) return changes @@ -233,11 +262,11 @@ class Screen: for b in self._emit(Opcode.SET_CONTENT, content): yield b - #print("page %d content %d offsets %s" % (page, content, + # print("page %d content %d offsets %s" % (page, content, # offsets)) for (run_length, offset) in sorted(offsets, reverse=True): if run_length > 1: - #print("Offset %d run length %d" % (offset, run_length)) + # print("Offset %d run length %d" % (offset, run_length)) for b in self._emit(Opcode.RLE, offset, run_length): yield b for i in range(run_length): @@ -357,7 +386,12 @@ class Screen: def _write(self, addr: int, val: int) -> None: """Updates screen image to set 0xaddr ^= val""" - _, y, x = self.ADDR_TO_COORDS[addr] + try: + _, y, x = self.ADDR_TO_COORDS[addr] + except KeyError: + # TODO: filter out screen holes + # print("Attempt to write to invalid offset %04x" % addr) + return self.screen[y][x] = val def to_bitmap(self) -> np.array: @@ -373,9 +407,9 @@ class Screen: bm = bm[:, reorder_cols] # Undouble pixels - return np.array(np.delete(bm, np.arange(0, bm.shape[1], 2), axis=1), - dtype=np.bool) - #return np.array(bm, dtype=np.bool) + # return np.array(np.delete(bm, np.arange(0, bm.shape[1], 2), axis=1), + # dtype=np.bool) + return np.array(bm, dtype=np.bool) def from_stream(self, stream: Iterator[int]) -> Tuple[int, int, int]: """Replay an opcode stream to build a screen image."""