- Stop masking out unchanged bytes explicitly and compare the full

source vs target frame.  This allows us to accumulate runs across
unchanged bytes, if they happen to be the same content value.

- introduce an allowable bit error when building runs, i.e. trade
  some slight imprecision for much more efficient decoding.  This gives
  a slight (~2%) reduction in similarity on my test frames at 140 pixels
  but improves the 280 pixel similarity significantly (~7%)

- so make 280 pixels the default for now

- once the run is complete, compute the median value of each bit in
  the run and use that as content byte.  I also tried mean which had
  exactly the same output

- runs will sometimes now span the (0x7x) screen holes so for now just
  ignore invalid addresses in _write
This commit is contained in:
kris 2019-01-03 17:38:47 +00:00
parent 1c13352106
commit c797852324

112
screen.py
View File

@ -43,7 +43,8 @@ class Opcode(enum.Enum):
class Frame: class Frame:
"""Bitmapped screen frame.""" """Bitmapped screen frame."""
XMAX = 140 # 280 # double-wide pixels to not worry about colour effects XMAX = 280 # 140 # double-wide pixels to not worry about colour
# effects
YMAX = 192 YMAX = 192
def __init__(self, bitmap: np.array = None): def __init__(self, bitmap: np.array = None):
@ -93,7 +94,7 @@ class Screen:
""" """
# Double each pixel horizontally # Double each pixel horizontally
pixels = np.repeat(bitmap, 2, axis=1) pixels = bitmap # np.repeat(bitmap, 2, axis=1)
# Insert zero column after every 7 # Insert zero column after every 7
for i in range(pixels.shape[1] // 7 - 1, -1, -1): for i in range(pixels.shape[1] // 7 - 1, -1, -1):
@ -134,10 +135,6 @@ class Screen:
# Target screen memory map for new frame # Target screen memory map for new frame
target = self._encode(frame.bitmap) target = self._encode(frame.bitmap)
# Compute difference from current frame
delta = np.bitwise_xor(self.screen, target)
delta = np.ma.masked_array(delta, np.logical_not(delta))
# Estimate number of opcodes that will end up fitting in the cycle # Estimate number of opcodes that will end up fitting in the cycle
# budget. # budget.
est_opcodes = int(cycle_budget / fullness / self.CYCLES[0]) est_opcodes = int(cycle_budget / fullness / self.CYCLES[0])
@ -145,44 +142,45 @@ class Screen:
# Sort by highest xor weight and take the estimated number of change # Sort by highest xor weight and take the estimated number of change
# operations # operations
changes = list( changes = list(
sorted(self.index_changes(delta, target), reverse=True) sorted(self.index_changes(self.screen, target), reverse=True)
)[:est_opcodes] )[:est_opcodes]
for b in self._heuristic_page_first_opcode_scheduler(changes): for b in self._heuristic_page_first_opcode_scheduler(changes):
yield b yield b
def index_changes(self, deltas: np.array, def index_changes(self, source: np.array,
target: np.array) -> Set[Tuple[int, int, int, int, int]]: target: np.array) -> Set[Tuple[int, int, int, int, int]]:
"""Transform encoded screen to sequence of change tuples. """Transform encoded screen to sequence of change tuples.
Change tuple is (xor_weight, page, offset, content) Change tuple is (xor_weight, page, offset, content)
""" """
# Compute difference from current frame
deltas = np.bitwise_xor(self.screen, target)
deltas = np.ma.masked_array(deltas, np.logical_not(deltas))
changes = set() changes = set()
# TODO: don't use 256 bytes if XMAX is smaller, or we may compute RLE
# Find runs in masked image # over the full page!
memmap = defaultdict(lambda: [(0, 0, 0)] * 256)
memmap = defaultdict(lambda: [(None, None)] * 256)
it = np.nditer(target, flags=['multi_index']) it = np.nditer(target, flags=['multi_index'])
while not it.finished: while not it.finished:
y, x_byte = it.multi_index y, x_byte = it.multi_index
# Skip masked values, i.e. unchanged in new frame
xor = deltas[y][x_byte]
if xor is np.ma.masked:
it.iternext()
continue
y_base = self.Y_TO_BASE_ADDR[self.page][y] y_base = self.Y_TO_BASE_ADDR[self.page][y]
page = y_base >> 8 page = y_base >> 8
# print("y=%d -> page=%02x" % (y, page)) # print("y=%d -> page=%02x" % (y, page))
xor_weight = hamming_weight(xor)
offset = y_base - (page << 8) + x_byte offset = y_base - (page << 8) + x_byte
content = np.asscalar(it[0])
memmap[page][offset] = (xor_weight, content) src_content = source[y][x_byte]
target_content = np.asscalar(it[0])
bits_different = hamming_weight(src_content ^ target_content)
memmap[page][offset] = (bits_different, src_content, target_content)
it.iternext() it.iternext()
for page, offsets in memmap.items(): for page, offsets in memmap.items():
@ -190,31 +188,62 @@ class Screen:
run_length = 0 run_length = 0
maybe_run = [] maybe_run = []
for offset, data in enumerate(offsets): for offset, data in enumerate(offsets):
xor_weight, content = data bits_different, src_content, target_content = data
if cur_content != content and cur_content is not None: # TODO: allowing odd bit errors introduces colour error
if maybe_run and hamming_weight(
cur_content ^ target_content) > 2:
# End of run # End of run
if run_length >= 4:
# Decide if it's worth emitting as a run vs single stores
# Number of changes in run for which >0 bits differ
num_changes = len([c for c in maybe_run if c[0]])
run_cost = self.CYCLES[Opcode.RLE] + run_length * 9
single_cost = self.CYCLES[0] * num_changes
#print("Run of %d cheaper than %d singles" % (
# run_length, num_changes))
# TODO: don't allow too much error to accumulate
if run_cost < single_cost:
# Compute median bit value over run
median_bits = np.median(
np.vstack(
np.unpackbits(
np.array(r[3], dtype=np.uint8)
)
for r in maybe_run
), axis=0
) > 0.5
typical_content = np.asscalar(np.packbits(median_bits))
total_xor = sum(ch[0] for ch in maybe_run) total_xor = sum(ch[0] for ch in maybe_run)
change = (total_xor, page, offset - run_length, start_offset = maybe_run[0][2]
cur_content, run_length)
#print("Found run of %d * %2x at %2x:%2x" % ( change = (total_xor, page, start_offset,
typical_content, run_length)
# print("Found run of %d * %2x at %2x:%2x" % (
# run_length, cur_content, page, offset - run_length) # run_length, cur_content, page, offset - run_length)
# ) # )
#print(maybe_run)
#print("change =", change)
changes.add(change) changes.add(change)
else: else:
changes.update(ch for ch in maybe_run) changes.update(ch for ch in maybe_run if ch[0])
maybe_run = [] maybe_run = []
run_length = 0 run_length = 0
cur_content = content cur_content = target_content
if cur_content is None: if cur_content is None:
cur_content = content cur_content = target_content
if content is not None: run_length += 1
run_length += 1 # if bits_different != 0:
maybe_run.append((xor_weight, page, offset, content, 1)) # # Only accumulate bytes for which src != target content
assert len(maybe_run) == run_length, (maybe_run, run_length) maybe_run.append(
(bits_different, page, offset, target_content, 1))
return changes return changes
@ -233,11 +262,11 @@ class Screen:
for b in self._emit(Opcode.SET_CONTENT, content): for b in self._emit(Opcode.SET_CONTENT, content):
yield b yield b
#print("page %d content %d offsets %s" % (page, content, # print("page %d content %d offsets %s" % (page, content,
# offsets)) # offsets))
for (run_length, offset) in sorted(offsets, reverse=True): for (run_length, offset) in sorted(offsets, reverse=True):
if run_length > 1: if run_length > 1:
#print("Offset %d run length %d" % (offset, run_length)) # print("Offset %d run length %d" % (offset, run_length))
for b in self._emit(Opcode.RLE, offset, run_length): for b in self._emit(Opcode.RLE, offset, run_length):
yield b yield b
for i in range(run_length): for i in range(run_length):
@ -357,7 +386,12 @@ class Screen:
def _write(self, addr: int, val: int) -> None: def _write(self, addr: int, val: int) -> None:
"""Updates screen image to set 0xaddr ^= val""" """Updates screen image to set 0xaddr ^= val"""
_, y, x = self.ADDR_TO_COORDS[addr] try:
_, y, x = self.ADDR_TO_COORDS[addr]
except KeyError:
# TODO: filter out screen holes
# print("Attempt to write to invalid offset %04x" % addr)
return
self.screen[y][x] = val self.screen[y][x] = val
def to_bitmap(self) -> np.array: def to_bitmap(self) -> np.array:
@ -373,9 +407,9 @@ class Screen:
bm = bm[:, reorder_cols] bm = bm[:, reorder_cols]
# Undouble pixels # Undouble pixels
return np.array(np.delete(bm, np.arange(0, bm.shape[1], 2), axis=1), # return np.array(np.delete(bm, np.arange(0, bm.shape[1], 2), axis=1),
dtype=np.bool) # dtype=np.bool)
#return np.array(bm, dtype=np.bool) return np.array(bm, dtype=np.bool)
def from_stream(self, stream: Iterator[int]) -> Tuple[int, int, int]: def from_stream(self, stream: Iterator[int]) -> Tuple[int, int, int]:
"""Replay an opcode stream to build a screen image.""" """Replay an opcode stream to build a screen image."""